[GitHub] [flink] flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement 
ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#issuecomment-545881191
 
 
   
   ## CI report:
   
   * 802ebf37e3d932169f9826b40df483bb5e9ac064 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133366796)
   * f16938ce2fb38ae216def737d14643b94d6083a1 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133940607)
   * 20de5cfc7af9a8ba57080d5218fd0293f393a40e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133998545)
   * 56bfbb65802c1d5c48caa625a152070934bb5d79 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134010321)
   * 3ea229382fef64b1046673c79ff845d4689c5db4 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/137145261)
   * 3169988a33e0126e79cd449740c93d3561296ead : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138017410)
   * a0dd858b0b91443fc87895a2d32ebfbbc0b9fe4c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138730857)
   * 86b4537979265a0fbecf7c1841ed8fd2f7ebfd86 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138770610)
   * d77f83e133b497e42cd85aeaf95e625411274c92 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139116897)
   * f2ed0e4b8d37dc3d5b2b770d943e703f4d893da0 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139256990)
   * ce01f742aba3a683b98a0ea7da47e678d30c12be : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139290769)
   * 7392ac618203c2544ab4a313a7068e03fcc147a7 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139453237)
   * 3b3e23e69bde65f504ea5d46ef7c2d55ba04c0af : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139459406)
   * 71c087913517972655181e0119588c886e4243cb : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139461705)
   * 9be9f9f2338a2a953a76cc60057554d7d90157f9 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139518666)
   * bb01ae34a8d6087f4aef4a989c0d14d576df601f : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139523468)
   * fd9cdf56812f2d0c9d659b69d9fb243d58ef48a3 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139555865)
   * c647669902f574c3ee23adc7efbabc859d93c0e4 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139627616)
   * cc29fb3058998b1489451d53ad04d9e9886f4193 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139635222)
   * 3d315606bd4fdf826ef823631116a189810dab6a : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139641767)
   * 37bfdff9aa766deaac10d9c5407cc244d14d1085 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139643868)
   * daa0ff528a8ec3c2dcbd33927099cee2d8faa113 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139650255)
   * 17ba1baf2383844c1b0ca8aa8ca97a80646e77fb : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139653134)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10455: [FLINK-15089][connectors] Puslar catalog

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #10455: [FLINK-15089][connectors] Puslar 
catalog
URL: https://github.com/apache/flink/pull/10455#issuecomment-562462991
 
 
   
   ## CI report:
   
   * 206bb09e47ff4f36514a93c96f7cadc30f62cd4b : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139653117)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
wangyang0918 commented on a change in pull request #9984: 
[FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354702929
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link KubernetesResourceManager}.
+ */
+public class KubernetesResourceManagerTest extends KubernetesTestBase {
+
+   private static final Time TIMEOUT = Time.seconds(10L);
+
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+
+   private final String jobManagerHost = "jm-host1";
+
+   private Configuration flinkConfig;
+
+   private TestingKubernetesResourceManager resourceManager;
+
+   private Fli

[GitHub] [flink] flinkbot commented on issue #10458: [FLINK-14815][rest]Expose network metric in IOMetricsInfo

2019-12-05 Thread GitBox
flinkbot commented on issue #10458: [FLINK-14815][rest]Expose network metric in 
IOMetricsInfo
URL: https://github.com/apache/flink/pull/10458#issuecomment-562472069
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 303f921f58ff61aa3e332c36ffdc07d5f9ceb352 (Fri Dec 06 
07:57:08 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TsReaper commented on issue #10306: [FLINK-13943][table-api] Provide utility method to convert Flink table to Java List

2019-12-05 Thread GitBox
TsReaper commented on issue #10306: [FLINK-13943][table-api] Provide utility 
method to convert Flink table to Java List
URL: https://github.com/apache/flink/pull/10306#issuecomment-562471647
 
 
   Travis passed: https://travis-ci.com/TsReaper/flink/builds/139638323


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wsry commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle.

2019-12-05 Thread GitBox
wsry commented on a change in pull request #10375: [FLINK-14845][runtime] 
Introduce data compression to reduce disk and network IO of shuffle.
URL: https://github.com/apache/flink/pull/10375#discussion_r354702467
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 ##
 @@ -54,6 +54,28 @@
.withDescription("Enable SSL support for the 
taskmanager data transport. This is applicable only when the" +
" global flag for internal SSL (" + 
SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true");
 
+   /**
+* Boolean flag indicating whether the shuffle data will be compressed 
or not.
+*
+* Note: Data is compressed per buffer (may be sliced buffer in 
pipeline mode) and compression can incur extra
+* CPU overhead so it is more effective for IO bounded scenario when 
data compression ratio is high.
+*/
+   public static final ConfigOption DATA_COMPRESSION_ENABLED =
+   key("taskmanager.data.compression.enabled")
 
 Review comment:
   I am a little afraid that when we make it pluggable in the future, the "LZ4" 
config value can be  dropped and users already using compression may have to 
change their configurations. Currently, the codec config option is not 
documented and we can change it to any other values in the future when we make 
it pluggable in the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-14815) Expose network metric in IOMetricsInfo

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-14815:
---
Labels: pull-request-available  (was: )

> Expose network metric in IOMetricsInfo
> --
>
> Key: FLINK-14815
> URL: https://issues.apache.org/jira/browse/FLINK-14815
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Metrics, Runtime / Network, Runtime / REST
>Reporter: lining
>Assignee: lining
>Priority: Major
>  Labels: pull-request-available
>
> * SubTask
>  **  pool usage: outPoolUsage, inputExclusiveBuffersUsage, 
> inputFloatingBuffersUsage.
>  *** If the subtask is not back pressured, but it is causing backpressure 
> (full input, empty output)
>  *** By comparing exclusive/floating buffers usage, whether all channels are 
> back-pressure or only some of them
>  ** back-pressured for show whether it is back pressured.
>  * Vertex
>  ** pool usage: outPoolUsageAvg, inputExclusiveBuffersUsageAvg, 
> inputFloatingBuffersUsageAvg
>  ** back-pressured for show whether it is back pressured(merge all iths 
> subtasks)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] jinglining opened a new pull request #10458: [FLINK-14815][rest]Expose network metric in IOMetricsInfo

2019-12-05 Thread GitBox
jinglining opened a new pull request #10458: [FLINK-14815][rest]Expose network 
metric in IOMetricsInfo
URL: https://github.com/apache/flink/pull/10458
 
 
   
   
   ## What is the purpose of the change
   
   - Expose network metric in IOMetricsInfo
   - JobVertex exposes 
is-backpressed、out-pool-usage-avg、input-exclusive-buffers-usage-avg、input-floating-buffers-usage-avg.
   - SubTask exposes 
is-backpressed、out-pool-usage、input-exclusive-buffers-usage、input-floating-buffers-usage.
   
   
   ## Brief change log
   
   - calculate in MutableIOMetrics#addIOMetrics
   - add SubTaskIOMetricsInfo for subtask's metrics in rest api.
   - add JobVertexIOMetricsInfo for job vertex's metrics in rest api.
   
   
   ## Verifying this change
   
   
   
   This change is already covered by existing tests, such as 
SubtaskCurrentAttemptDetailsHandlerTest、SubtaskExecutionAttemptDetailsHandlerTest、JobDetailsInfoTest、SubtaskExecutionAttemptDetailsInfoTest、JobVertexDetailsInfoTest.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10422: [FLINK-15057][tests] Set JM and TM memory config in flink-conf.yaml

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #10422: [FLINK-15057][tests] Set JM and TM 
memory config in flink-conf.yaml
URL: https://github.com/apache/flink/pull/10422#issuecomment-561743112
 
 
   
   ## CI report:
   
   * 88f6f7790d8bb0b517958be4446e95146605c369 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139377231)
   * 5c4f99c16f800edf9f8f9de5889be7e7ee7b297a : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139563662)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354701520
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link KubernetesResourceManager}.
+ */
+public class KubernetesResourceManagerTest extends KubernetesTestBase {
+
+   private static final Time TIMEOUT = Time.seconds(10L);
+
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+
+   private final String jobManagerHost = "jm-host1";
+
+   private Configuration flinkConfig;
+
+   private TestingKubernetesResourceManager resourceManager;
+
+   private FlinkK

[jira] [Commented] (FLINK-15087) JobManager is forced to shutdown JVM due to temporary loss of zookeeper connection

2019-12-05 Thread lamber-ken (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989482#comment-16989482
 ] 

lamber-ken commented on FLINK-15087:


hi, this may be is a duplicate of FLINK-10052.

> JobManager is forced to shutdown JVM due to temporary loss of zookeeper 
> connection
> --
>
> Key: FLINK-15087
> URL: https://issues.apache.org/jira/browse/FLINK-15087
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.8.2
>Reporter: Abdul Qadeer
>Priority: Major
>
> While testing I found that the loss of connection with zookeeper triggers JVM 
> shutdown for Job Manager, when started through 
> "StandaloneSessionClusterEntrypoint". This happens due to a NPE on 
> "taskManagerHeartbeatManager."
> When JobManagerRunner suspends jobMasterService (as Job manager is no longer 
> leader), "taskManagerHeartbeatManager" is set to null in 
> "stopHeartbeatServices".
> Next, "AkkaRpcActor" stops JobMaster and throws NPE in the following method:
> {code:java}
> @Override
> public CompletableFuture disconnectTaskManager(final ResourceID 
> resourceID, final Exception cause) {
>log.debug("Disconnect TaskExecutor {} because: {}", resourceID, 
> cause.getMessage());
>taskManagerHeartbeatManager.unmonitorTarget(resourceID);
>slotPool.releaseTaskManager(resourceID, cause);
> {code}
>  
> This leads to a fatal error finally in "ClusterEntryPoint.onFatalError()" and 
> forces JVM shutdown.
> The stack trace is below:
>  
> {noformat}
> {"timeMillis":1575581120723,"thread":"flink-akka.actor.default-dispatcher-93","level":"ERROR","loggerName":"com.Sample","message":"Failed
>  to take leadership with session id 
> b4662db5-f065-41d9-aaaf-78625355b251.","thrown":{"commonElementCount":0,"localizedMessage":"Failed
>  to take leadership with session id 
> b4662db5-f065-41d9-aaaf-78625355b251.","message":"Failed to take leadership 
> with session id 
> b4662db5-f065-41d9-aaaf-78625355b251.","name":"org.apache.flink.runtime.dispatcher.DispatcherException","cause":{"commonElementCount":18,"localizedMessage":"Termination
>  of previous JobManager for job bbb8c430787d92293e9d45c349231d9c failed. 
> Cannot submit job under the same job id.","message":"Termination of previous 
> JobManager for job bbb8c430787d92293e9d45c349231d9c failed. Cannot submit job 
> under the same job 
> id.","name":"org.apache.flink.runtime.dispatcher.DispatcherException","cause":{"commonElementCount":6,"localizedMessage":"org.apache.flink.util.FlinkException:
>  Could not properly shut down the 
> JobManagerRunner","message":"org.apache.flink.util.FlinkException: Could not 
> properly shut down the 
> JobManagerRunner","name":"java.util.concurrent.CompletionException","cause":{"commonElementCount":6,"localizedMessage":"Could
>  not properly shut down the JobManagerRunner","message":"Could not properly 
> shut down the 
> JobManagerRunner","name":"org.apache.flink.util.FlinkException","cause":{"commonElementCount":13,"localizedMessage":"Failure
>  while stopping RpcEndpoint jobmanager_0.","message":"Failure while stopping 
> RpcEndpoint 
> jobmanager_0.","name":"org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException","cause":{"commonElementCount":13,"name":"java.lang.NullPointerException","extendedStackTrace":[{"class":"org.apache.flink.runtime.jobmaster.JobMaster","method":"disconnectTaskManager","file":"JobMaster.java","line":629,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.jobmaster.JobMaster","method":"onStop","file":"JobMaster.java","line":346,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState","method":"terminate","file":"AkkaRpcActor.java","line":504,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"handleControlMessage","file":"AkkaRpcActor.java","line":170,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"onReceive","file":"AkkaRpcActor.java","line":142,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"}]},"extendedStackTrace":[{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState","method":"terminate","file":"AkkaRpcActor.java","line":508,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"handleControlMessage","file":"AkkaRpcActor.java","line":170,"exact":false,"location":"flink-runtime_2.11-1.8.2.jar","version":"1.8.2"},{"class":"org.apache.flink.runtime.rpc.akka.AkkaRpcActor","method":"onReceive","file":

[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.

2019-12-05 Thread GitBox
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] 
Add command-line options for managing Python UDF environment and dependencies.
URL: https://github.com/apache/flink/pull/10430#discussion_r354672644
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 ##
 @@ -183,13 +213,19 @@
STOP_AND_DRAIN.setRequired(false);
 
PY_OPTION.setRequired(false);
-   PY_OPTION.setArgName("python");
+   PY_OPTION.setArgName("pythonFile");
 
PYFILES_OPTION.setRequired(false);
-   PYFILES_OPTION.setArgName("pyFiles");
+   PYFILES_OPTION.setArgName("pythonFiles");
 
PYMODULE_OPTION.setRequired(false);
PYMODULE_OPTION.setArgName("pyModule");
+
+   PYREQUIREMENTS_OPTION.setRequired(false);
+
+   PYARCHIVE_OPTION.setRequired(false);
 
 Review comment:
   PYARCHIVE_OPTION.setArgName("pyArchives")


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.

2019-12-05 Thread GitBox
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] 
Add command-line options for managing Python UDF environment and dependencies.
URL: https://github.com/apache/flink/pull/10430#discussion_r354685811
 
 

 ##
 File path: flink-python/pyflink/common/tests/test_python_option.py
 ##
 @@ -0,0 +1,130 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+import tempfile
+
+from py4j.protocol import Py4JError
+from pyflink.util.utils import to_jarray
+
+from pyflink.common import Configuration
+from pyflink.java_gateway import launch_gateway
+
+from pyflink.common.dependency_manager import DependencyManager
+
+from pyflink.testing.test_case_utils import (PyFlinkTestCase, replace_uuid,
+ TestEnv, encode_to_base64,
+ create_empty_file)
+
+
+class PythonOptionTests(PyFlinkTestCase):
+
+def setUp(self):
+self.j_env = TestEnv()
+self.config = Configuration()
+self.dependency_manager = DependencyManager(self.config, self.j_env)
+
+def test_python_options(self):
+dm = DependencyManager
+
+system_env = dict()
+system_env[dm.PYFLINK_PY_FILES] = 
"/file1.py\nhdfs://file2.zip\nfile3.egg"
+system_env[dm.PYFLINK_PY_REQUIREMENTS] = "a.txt\nb_dir"
+system_env[dm.PYFLINK_PY_EXECUTABLE] = "/usr/local/bin/python"
+system_env[dm.PYFLINK_PY_ARCHIVES] = 
"/py3.zip\nvenv\n/py3.zip\n\ndata.zip\ndata"
+
+self.dependency_manager.load_from_env(system_env)
+
+configs = self.config.to_dict()
+python_files = replace_uuid(json.loads(configs[dm.PYTHON_FILES]))
+python_requirements_file = 
replace_uuid(configs[dm.PYTHON_REQUIREMENTS_FILE])
+python_requirements_cache = 
replace_uuid(configs[dm.PYTHON_REQUIREMENTS_CACHE])
+python_archives = replace_uuid(json.loads(configs[dm.PYTHON_ARCHIVES]))
+python_exec = configs[dm.PYTHON_EXEC]
+registered_files = replace_uuid(self.j_env.to_dict())
+
+self.assertEqual(
+{"python_file_0_{uuid}": "file1.py",
+ "python_file_1_{uuid}": "file2.zip",
+ "python_file_2_{uuid}": "file3.egg"}, python_files)
+self.assertEqual(
+{"python_archive_3_{uuid}": "venv",
+ "python_archive_4_{uuid}": "py3.zip",
+ "python_archive_5_{uuid}": "data"}, python_archives)
+self.assertEqual("python_requirements_file_6_{uuid}", 
python_requirements_file)
+self.assertEqual("python_requirements_cache_7_{uuid}", 
python_requirements_cache)
+self.assertEqual("/usr/local/bin/python", python_exec)
+self.assertEqual(
+{"python_file_0_{uuid}": "/file1.py",
+ "python_file_1_{uuid}": "hdfs://file2.zip",
+ "python_file_2_{uuid}": "file3.egg",
+ "python_archive_3_{uuid}": "/py3.zip",
+ "python_archive_4_{uuid}": "/py3.zip",
+ "python_archive_5_{uuid}": "data.zip",
+ "python_requirements_file_6_{uuid}": "a.txt",
+ "python_requirements_cache_7_{uuid}": "b_dir"}, registered_files)
+
+def test_python_options_integrated(self):
+tmp_dir = tempfile.mkdtemp(dir=self.tempdir)
 
 Review comment:
   tempdir could be used directly, there is no need to create it again, what do 
you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.

2019-12-05 Thread GitBox
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] 
Add command-line options for managing Python UDF environment and dependencies.
URL: https://github.com/apache/flink/pull/10430#discussion_r354660340
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 ##
 @@ -126,14 +126,44 @@
 
static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true,
"Attach custom python files for job. " +
-   "Comma can be used as the separator to specify multiple 
files. " +
-   "The standard python resource file suffixes such as 
.py/.egg/.zip are all supported." +
-   "(eg: --pyFiles 
file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)");
+   "These files will be added to the PYTHONPATH of both 
the local client and the remote python UDF worker. " +
+   "The standard python resource file suffixes such as 
.py/.egg/.zip or directory are all supported. " +
+   "Comma (',') could be used as the separator to specify 
multiple files " +
+   "(e.g.: --pyFiles 
file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).");
 
static final Option PYMODULE_OPTION = new Option("pym", "pyModule", 
true,
"Python module with the program entry point. " +
"This option must be used in conjunction with 
`--pyFiles`.");
 
+   static final Option PYREQUIREMENTS_OPTION = new Option("pyreq", 
"pyRequirements", true,
+   "Specify a requirements.txt file which defines the third-party 
dependencies. " +
+   "These dependencies will be installed and added to the 
PYTHONPATH of the python UDF worker. " +
+   "A directory which contains the installation packages 
of these dependencies could be specified" +
 
 Review comment:
   Missing empty space at the end


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.

2019-12-05 Thread GitBox
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] 
Add command-line options for managing Python UDF environment and dependencies.
URL: https://github.com/apache/flink/pull/10430#discussion_r354669358
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 ##
 @@ -126,14 +126,44 @@
 
static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true,
"Attach custom python files for job. " +
-   "Comma can be used as the separator to specify multiple 
files. " +
-   "The standard python resource file suffixes such as 
.py/.egg/.zip are all supported." +
-   "(eg: --pyFiles 
file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)");
+   "These files will be added to the PYTHONPATH of both 
the local client and the remote python UDF worker. " +
+   "The standard python resource file suffixes such as 
.py/.egg/.zip or directory are all supported. " +
+   "Comma (',') could be used as the separator to specify 
multiple files " +
+   "(e.g.: --pyFiles 
file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).");
 
static final Option PYMODULE_OPTION = new Option("pym", "pyModule", 
true,
"Python module with the program entry point. " +
"This option must be used in conjunction with 
`--pyFiles`.");
 
+   static final Option PYREQUIREMENTS_OPTION = new Option("pyreq", 
"pyRequirements", true,
+   "Specify a requirements.txt file which defines the third-party 
dependencies. " +
+   "These dependencies will be installed and added to the 
PYTHONPATH of the python UDF worker. " +
+   "A directory which contains the installation packages 
of these dependencies could be specified" +
+   "via second parameter. Use '#' as the separator of 
parameters if second parameter exists. " +
 
 Review comment:
   What about "could be specified optionally. Use '#' as the separator if the 
optional parameter exists.  "?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.

2019-12-05 Thread GitBox
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] 
Add command-line options for managing Python UDF environment and dependencies.
URL: https://github.com/apache/flink/pull/10430#discussion_r354672690
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 ##
 @@ -183,13 +213,19 @@
STOP_AND_DRAIN.setRequired(false);
 
PY_OPTION.setRequired(false);
-   PY_OPTION.setArgName("python");
+   PY_OPTION.setArgName("pythonFile");
 
PYFILES_OPTION.setRequired(false);
-   PYFILES_OPTION.setArgName("pyFiles");
+   PYFILES_OPTION.setArgName("pythonFiles");
 
PYMODULE_OPTION.setRequired(false);
PYMODULE_OPTION.setArgName("pyModule");
+
+   PYREQUIREMENTS_OPTION.setRequired(false);
+
+   PYARCHIVE_OPTION.setRequired(false);
+
+   PYEXEC_OPTION.setRequired(false);
 
 Review comment:
   PYEXEC_OPTION.setArgName("pyExecutable")


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.

2019-12-05 Thread GitBox
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] 
Add command-line options for managing Python UDF environment and dependencies.
URL: https://github.com/apache/flink/pull/10430#discussion_r354675266
 
 

 ##
 File path: flink-python/pyflink/common/dependency_manager.py
 ##
 @@ -39,6 +39,15 @@ class DependencyManager(object):
 PYTHON_ARCHIVES = "python.archives"
 PYTHON_EXEC = "python.exec"
 
+# Environment variable names used to store the dependency settings 
specified via commandline
+# options.
+# The "PYFLINK_" prefix indicates that the variable is used to transmit 
data from external
 
 Review comment:
   Personally I think this paragraph is not necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.

2019-12-05 Thread GitBox
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] 
Add command-line options for managing Python UDF environment and dependencies.
URL: https://github.com/apache/flink/pull/10430#discussion_r354672570
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 ##
 @@ -183,13 +213,19 @@
STOP_AND_DRAIN.setRequired(false);
 
PY_OPTION.setRequired(false);
-   PY_OPTION.setArgName("python");
+   PY_OPTION.setArgName("pythonFile");
 
PYFILES_OPTION.setRequired(false);
-   PYFILES_OPTION.setArgName("pyFiles");
+   PYFILES_OPTION.setArgName("pythonFiles");
 
PYMODULE_OPTION.setRequired(false);
PYMODULE_OPTION.setArgName("pyModule");
+
+   PYREQUIREMENTS_OPTION.setRequired(false);
 
 Review comment:
   PYREQUIREMENTS_OPTION.setArgName("pyRequirements")


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.

2019-12-05 Thread GitBox
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] 
Add command-line options for managing Python UDF environment and dependencies.
URL: https://github.com/apache/flink/pull/10430#discussion_r354677710
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/client/python/PythonDriverEnvUtils.java
 ##
 @@ -138,9 +152,34 @@ public static PythonEnvironment preparePythonEnvironment(
}
 
env.pythonPath = String.join(File.pathSeparator, 
pythonPathList);
+
+   if (!pythonDriverOptions.getPyFiles().isEmpty()) {
+   env.systemEnv.put(PYFLINK_PY_FILES, String.join("\n", 
pythonDriverOptions.getPyFiles()));
+   }
+   if (!pythonDriverOptions.getPyArchives().isEmpty()) {
+   env.systemEnv.put(
+   PYFLINK_PY_ARCHIVES,
+   
joinTuples(pythonDriverOptions.getPyArchives().toArray(new Tuple2[0])));
+   }
+   pythonDriverOptions.getPyRequirements().ifPresent(
+   pyRequirements -> 
env.systemEnv.put(PYFLINK_PY_REQUIREMENTS, joinTuples(pyRequirements)));
+   pythonDriverOptions.getPyExecutable().ifPresent(
+   pyExecutable -> 
env.systemEnv.put(PYFLINK_PY_EXECUTABLE, 
pythonDriverOptions.getPyExecutable().get()));
return env;
}
 
+   @SafeVarargs
+   private static String joinTuples(Tuple2... tuples) {
 
 Review comment:
   What about change the signature to "Collection> 
tuples"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.

2019-12-05 Thread GitBox
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] 
Add command-line options for managing Python UDF environment and dependencies.
URL: https://github.com/apache/flink/pull/10430#discussion_r354671048
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 ##
 @@ -126,14 +126,44 @@
 
static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true,
"Attach custom python files for job. " +
-   "Comma can be used as the separator to specify multiple 
files. " +
-   "The standard python resource file suffixes such as 
.py/.egg/.zip are all supported." +
-   "(eg: --pyFiles 
file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)");
+   "These files will be added to the PYTHONPATH of both 
the local client and the remote python UDF worker. " +
+   "The standard python resource file suffixes such as 
.py/.egg/.zip or directory are all supported. " +
+   "Comma (',') could be used as the separator to specify 
multiple files " +
+   "(e.g.: --pyFiles 
file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).");
 
static final Option PYMODULE_OPTION = new Option("pym", "pyModule", 
true,
"Python module with the program entry point. " +
"This option must be used in conjunction with 
`--pyFiles`.");
 
+   static final Option PYREQUIREMENTS_OPTION = new Option("pyreq", 
"pyRequirements", true,
+   "Specify a requirements.txt file which defines the third-party 
dependencies. " +
+   "These dependencies will be installed and added to the 
PYTHONPATH of the python UDF worker. " +
+   "A directory which contains the installation packages 
of these dependencies could be specified" +
+   "via second parameter. Use '#' as the separator of 
parameters if second parameter exists. " +
+   "(e.g.: --pyRequirements 
file:///tmp/requirements.txt#file:///tmp/cached_dir)");
+
+   static final Option PYARCHIVE_OPTION = new Option("pyarch", 
"pyArchives", true,
+   "Add python archive files for job. " +
 
 Review comment:
   What about keeping the length of each line similarly for each line instead 
of one sentence per line?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.

2019-12-05 Thread GitBox
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] 
Add command-line options for managing Python UDF environment and dependencies.
URL: https://github.com/apache/flink/pull/10430#discussion_r354671362
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 ##
 @@ -126,14 +126,44 @@
 
static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true,
"Attach custom python files for job. " +
-   "Comma can be used as the separator to specify multiple 
files. " +
-   "The standard python resource file suffixes such as 
.py/.egg/.zip are all supported." +
-   "(eg: --pyFiles 
file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip)");
+   "These files will be added to the PYTHONPATH of both 
the local client and the remote python UDF worker. " +
+   "The standard python resource file suffixes such as 
.py/.egg/.zip or directory are all supported. " +
+   "Comma (',') could be used as the separator to specify 
multiple files " +
+   "(e.g.: --pyFiles 
file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).");
 
static final Option PYMODULE_OPTION = new Option("pym", "pyModule", 
true,
"Python module with the program entry point. " +
"This option must be used in conjunction with 
`--pyFiles`.");
 
+   static final Option PYREQUIREMENTS_OPTION = new Option("pyreq", 
"pyRequirements", true,
+   "Specify a requirements.txt file which defines the third-party 
dependencies. " +
+   "These dependencies will be installed and added to the 
PYTHONPATH of the python UDF worker. " +
+   "A directory which contains the installation packages 
of these dependencies could be specified" +
+   "via second parameter. Use '#' as the separator of 
parameters if second parameter exists. " +
+   "(e.g.: --pyRequirements 
file:///tmp/requirements.txt#file:///tmp/cached_dir)");
+
+   static final Option PYARCHIVE_OPTION = new Option("pyarch", 
"pyArchives", true,
+   "Add python archive files for job. " +
+   "The file will be extracted to the working directory of 
python UDF worker. " +
 
 Review comment:
   file -> files


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] Add command-line options for managing Python UDF environment and dependencies.

2019-12-05 Thread GitBox
dianfu commented on a change in pull request #10430: [FLINK-14019][python][cli] 
Add command-line options for managing Python UDF environment and dependencies.
URL: https://github.com/apache/flink/pull/10430#discussion_r354674349
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
 ##
 @@ -80,51 +86,24 @@ public ProgramOptions(CommandLine line) throws 
CliArgsException {
isPython = line.hasOption(PY_OPTION.getOpt()) | 
line.hasOption(PYMODULE_OPTION.getOpt())
| 
"org.apache.flink.client.python.PythonGatewayServer".equals(entryPointClass);
// If specified the option -py(--python)
-   if (line.hasOption(PY_OPTION.getOpt())) {
-   // Cannot use option -py and -pym simultaneously.
-   if (line.hasOption(PYMODULE_OPTION.getOpt())) {
-   throw new CliArgsException("Cannot use option 
-py and -pym simultaneously.");
-   }
-   // The cli cmd args which will be transferred to 
PythonDriver will be transformed as follows:
-   // CLI cmd : -py ${python.py} pyfs [optional] 
${py-files} [optional] ${other args}.
-   // PythonDriver args: py ${python.py} [optional] pyfs 
[optional] ${py-files} [optional] ${other args}.
-   // 
---transformed---
-   // e.g. -py wordcount.py(CLI cmd) ---> py 
wordcount.py(PythonDriver args)
-   // e.g. -py wordcount.py -pyfs 
file:///AAA.py,hdfs:///BBB.py --input in.txt --output out.txt(CLI cmd)
-   //  -> -py wordcount.py -pyfs 
file:///AAA.py,hdfs:///BBB.py --input in.txt --output out.txt(PythonDriver args)
-   String[] newArgs;
-   int argIndex;
-   if (line.hasOption(PYFILES_OPTION.getOpt())) {
-   newArgs = new String[args.length + 4];
-   newArgs[2] = "-" + PYFILES_OPTION.getOpt();
-   newArgs[3] = 
line.getOptionValue(PYFILES_OPTION.getOpt());
-   argIndex = 4;
-   } else {
-   newArgs = new String[args.length + 2];
-   argIndex = 2;
-   }
-   newArgs[0] = "-" + PY_OPTION.getOpt();
-   newArgs[1] = line.getOptionValue(PY_OPTION.getOpt());
-   System.arraycopy(args, 0, newArgs, argIndex, 
args.length);
-   args = newArgs;
-   }
-
-   // If specified the option -pym(--pyModule)
-   if (line.hasOption(PYMODULE_OPTION.getOpt())) {
-   // If you specify the option -pym, you should specify 
the option --pyFiles simultaneously.
-   if (!line.hasOption(PYFILES_OPTION.getOpt())) {
-   throw new CliArgsException("-pym must be used 
in conjunction with `--pyFiles`");
+   if (isPython) {
 
 Review comment:
   Nice change!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10389: [FLINK-14198][python] Add "type" and "rtype" options to flink python API docstrings of table.py and table_environment.py

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #10389: [FLINK-14198][python] Add "type" and 
"rtype" options to flink python API docstrings of table.py and 
table_environment.py
URL: https://github.com/apache/flink/pull/10389#issuecomment-560984209
 
 
   
   ## CI report:
   
   * ebc6a4396f71fda053e32ed71994e5ce129835c1 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139081057)
   * 1a14a71c2785b0c98831779b1e058f585bdc762c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139501852)
   * b1b324f72f4561126eb1dcbf144947d2b3ef2dc1 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139653098)
   * ed548c842564c3beca8b2afefa0048b6a5b879bb : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10315: [FLINK-14552][table] Enable partition statistics in blink planner

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #10315: [FLINK-14552][table] Enable 
partition statistics in blink planner
URL: https://github.com/apache/flink/pull/10315#issuecomment-558221318
 
 
   
   ## CI report:
   
   * 05b30617c280796e80e8569dae48b726cddb23d8 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138084321)
   * 707fc20d39d5a2c82bd4a3024cd02a932413af70 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138184336)
   * 748ccd842d5558cc250ab9fd35d7e30e21e57406 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138268848)
   * 5edccbaf0cf95635b4a81fe9b16bfefcddf2d07c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138883725)
   * 72ed8881457f007135b6faf4ff5dfeefb6911d7e : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138972329)
   * 02498867d43f51e3c583b470cf39ca632737aa5d : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139141699)
   * 84e826cef61d4e659669fa169bf6f6a73035048b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/13975)
   * 54bd514481c22e6bce418f120e38b91eb361ad42 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139442856)
   * 71eed85744450cd933f0b061108ee7c49017bd9f : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139464310)
   * 71acb8878dad3d9966d43f27d3db3dace81ac301 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139470658)
   * ea45df4a4dd2cb13dcb37c1ada91d7c202cf708d : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139635199)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #10456: [FLINK-15091][table-planner-blink] Fix memory overused in SortMergeJoinOperator

2019-12-05 Thread GitBox
flinkbot commented on issue #10456: [FLINK-15091][table-planner-blink] Fix 
memory overused in SortMergeJoinOperator
URL: https://github.com/apache/flink/pull/10456#issuecomment-562468186
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit a6e02881eeedc154f73a8e1bd39dd98ce2a89e63 (Fri Dec 06 
07:41:46 UTC 2019)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun closed pull request #10457: !FOR TESTING!

2019-12-05 Thread GitBox
TisonKun closed pull request #10457: !FOR TESTING!
URL: https://github.com/apache/flink/pull/10457
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun opened a new pull request #10457: !FOR TESTING!

2019-12-05 Thread GitBox
TisonKun opened a new pull request #10457: !FOR TESTING!
URL: https://github.com/apache/flink/pull/10457
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-15091) JoinITCase.testFullJoinWithNonEquiJoinPred failed in travis

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-15091:
---
Labels: pull-request-available  (was: )

> JoinITCase.testFullJoinWithNonEquiJoinPred failed in travis
> ---
>
> Key: FLINK-15091
> URL: https://issues.apache.org/jira/browse/FLINK-15091
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Kurt Young
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>
> 04:45:22.404 [ERROR] Tests run: 21, Failures: 0, Errors: 1, Skipped: 0, Time 
> elapsed: 4.909 s <<< FAILURE! - in 
> org.apache.flink.table.planner.runtime.batch.table.JoinITCase 04:45:22.406 
> [ERROR] 
> testFullJoinWithNonEquiJoinPred(org.apache.flink.table.planner.runtime.batch.table.JoinITCase)
>  Time elapsed: 0.168 s <<< ERROR! 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
> at 
> org.apache.flink.table.planner.runtime.batch.table.JoinITCase.testFullJoinWithNonEquiJoinPred(JoinITCase.scala:344)
>  Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy Caused by: 
> org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 
> 32 pages. Only 0 pages are remaining.
>  
> details: [https://api.travis-ci.org/v3/job/621407747/log.txt]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] JingsongLi opened a new pull request #10456: [FLINK-15091][table-planner-blink] Fix memory overused in SortMergeJoinOperator

2019-12-05 Thread GitBox
JingsongLi opened a new pull request #10456: [FLINK-15091][table-planner-blink] 
Fix memory overused in SortMergeJoinOperator
URL: https://github.com/apache/flink/pull/10456
 
 
   
   ## What is the purpose of the change
   
   In SortMergeJoinOperator, we should check if it is a full join, then will 
use two externalBufferMemory.
   
   ## Brief change log
   
   Using two externalBufferMemory when SortMergeJoinOperator is full join.
   
   ## Verifying this change
   
   This change is already covered by existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient

2019-12-05 Thread GitBox
KurtYoung commented on a change in pull request #10454: 
[FLINK-13195][sql-client] Add create(drop) table support for SqlClient
URL: https://github.com/apache/flink/pull/10454#discussion_r354685477
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ##
 @@ -154,8 +160,29 @@ public ExecutionContext(
ClusterClientServiceLoader clusterClientServiceLoader,
Options commandLineOptions,
List availableCommandLines) throws 
FlinkException {
+   this(
+   environment,
+   originalSessionContext,
+   null,
+   dependencies,
+   flinkConfig,
+   new DefaultClusterClientServiceLoader(),
+   commandLineOptions,
+   availableCommandLines);
+   }
+
+   public ExecutionContext(
 
 Review comment:
   make all constructors `private` if you want to introduce `Builder`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
wangyang0918 commented on a change in pull request #9984: 
[FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354697346
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link KubernetesResourceManager}.
+ */
+public class KubernetesResourceManagerTest extends KubernetesTestBase {
+
+   private static final Time TIMEOUT = Time.seconds(10L);
+
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+
+   private final String jobManagerHost = "jm-host1";
+
+   private Configuration flinkConfig;
+
+   private TestingKubernetesResourceManager resourceManager;
+
+   private Fli

[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
wangyang0918 commented on a change in pull request #9984: 
[FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354697070
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link KubernetesResourceManager}.
+ */
+public class KubernetesResourceManagerTest extends KubernetesTestBase {
+
+   private static final Time TIMEOUT = Time.seconds(10L);
+
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+
+   private final String jobManagerHost = "jm-host1";
+
+   private Configuration flinkConfig;
+
+   private TestingKubernetesResourceManager resourceManager;
+
+   private Fli

[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
wangyang0918 commented on a change in pull request #9984: 
[FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354697186
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link KubernetesResourceManager}.
+ */
+public class KubernetesResourceManagerTest extends KubernetesTestBase {
+
+   private static final Time TIMEOUT = Time.seconds(10L);
+
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+
+   private final String jobManagerHost = "jm-host1";
+
+   private Configuration flinkConfig;
+
+   private TestingKubernetesResourceManager resourceManager;
+
+   private Fli

[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
wangyang0918 commented on a change in pull request #9984: 
[FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354696896
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link KubernetesResourceManager}.
+ */
+public class KubernetesResourceManagerTest extends KubernetesTestBase {
+
+   private static final Time TIMEOUT = Time.seconds(10L);
+
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+
+   private final String jobManagerHost = "jm-host1";
+
+   private Configuration flinkConfig;
+
+   private TestingKubernetesResourceManager resourceManager;
+
+   private Fli

[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
wangyang0918 commented on a change in pull request #9984: 
[FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354696517
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Kubernetes specific implementation of the {@link ResourceManager}.
+ */
+public class KubernetesResourceManager extends 
ActiveResourceManager
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesResourceManager.class);
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final Map workerNodes = new 
HashMap<>();
+
+   private final double defaultCpus;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private final AtomicLong currentMaxAttemptId = new AtomicLong(0);
 
 Review comment:
   Will use long instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10452: [FLINK-15035][table-planner-blink] Introduce unknown memory setting to table in blink planner

2019-12-05 Thread GitBox
JingsongLi commented on a change in pull request #10452: 
[FLINK-15035][table-planner-blink] Introduce unknown memory setting to table in 
blink planner
URL: https://github.com/apache/flink/pull/10452#discussion_r354681473
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/BatchExecNode.scala
 ##
 @@ -32,4 +33,13 @@ trait BatchExecNode[T] extends ExecNode[BatchPlanner, T] 
with Logging {
 */
   def getDamBehavior: DamBehavior
 
+  def setManagedMemoryWeight[X](
+  transformation: Transformation[X], memoryBytes: Long): Transformation[X] 
= {
+// Using Bytes can easily overflow
+// Using MebiBytes to cast to int
+// Careful about zero
+val memoryMB = Math.max(1, (memoryBytes >> 20).toInt)
 
 Review comment:
   Yeah, It's a bit hacky, looks like we create these hacky code to work around 
runtime default value.
   Because our current `ManagedMemoryWeight` settings are very large. So I 
think default 1 is very small and is not a serious problem, so I prefer just 
check in `ExecutorUtils#setBatchProperties`, and warn log when weight is 
default 1.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
wangyang0918 commented on a change in pull request #9984: 
[FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354695465
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Kubernetes specific implementation of the {@link ResourceManager}.
+ */
+public class KubernetesResourceManager extends 
ActiveResourceManager
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesResourceManager.class);
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final Map workerNodes = new 
HashMap<>();
+
+   private final double defaultCpus;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private final AtomicLong currentMaxAttemptId = new AtomicLong(0);
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private final AtomicLong currentMaxPodId = new AtomicLong(0);
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   private final List taskManagerStartCommand;
+
+   /** The number of pods requested, but not yet granted. */
+   private int numPendingPodRequests = 0;
+
+   public KubernetesResourceManager(
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   Configuration flinkConfig,
+ 

[GitHub] [flink] lamber-ken commented on issue #9751: [FLINK-14177] bump curator from 2.12.0 to 4.2.0

2019-12-05 Thread GitBox
lamber-ken commented on issue #9751: [FLINK-14177] bump curator from 2.12.0 to 
4.2.0
URL: https://github.com/apache/flink/pull/9751#issuecomment-562464972
 
 
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
wangyang0918 commented on a change in pull request #9984: 
[FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354694048
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Kubernetes specific implementation of the {@link ResourceManager}.
+ */
+public class KubernetesResourceManager extends 
ActiveResourceManager
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesResourceManager.class);
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final Map workerNodes = new 
HashMap<>();
+
+   private final double defaultCpus;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private final AtomicLong currentMaxAttemptId = new AtomicLong(0);
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private final AtomicLong currentMaxPodId = new AtomicLong(0);
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   private final List taskManagerStartCommand;
+
+   /** The number of pods requested, but not yet granted. */
+   private int numPendingPodRequests = 0;
+
+   public KubernetesResourceManager(
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   Configuration flinkConfig,
+ 

[GitHub] [flink] flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement 
ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#issuecomment-545881191
 
 
   
   ## CI report:
   
   * 802ebf37e3d932169f9826b40df483bb5e9ac064 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133366796)
   * f16938ce2fb38ae216def737d14643b94d6083a1 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133940607)
   * 20de5cfc7af9a8ba57080d5218fd0293f393a40e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133998545)
   * 56bfbb65802c1d5c48caa625a152070934bb5d79 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134010321)
   * 3ea229382fef64b1046673c79ff845d4689c5db4 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/137145261)
   * 3169988a33e0126e79cd449740c93d3561296ead : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138017410)
   * a0dd858b0b91443fc87895a2d32ebfbbc0b9fe4c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138730857)
   * 86b4537979265a0fbecf7c1841ed8fd2f7ebfd86 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138770610)
   * d77f83e133b497e42cd85aeaf95e625411274c92 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139116897)
   * f2ed0e4b8d37dc3d5b2b770d943e703f4d893da0 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139256990)
   * ce01f742aba3a683b98a0ea7da47e678d30c12be : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139290769)
   * 7392ac618203c2544ab4a313a7068e03fcc147a7 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139453237)
   * 3b3e23e69bde65f504ea5d46ef7c2d55ba04c0af : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139459406)
   * 71c087913517972655181e0119588c886e4243cb : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139461705)
   * 9be9f9f2338a2a953a76cc60057554d7d90157f9 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139518666)
   * bb01ae34a8d6087f4aef4a989c0d14d576df601f : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139523468)
   * fd9cdf56812f2d0c9d659b69d9fb243d58ef48a3 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139555865)
   * c647669902f574c3ee23adc7efbabc859d93c0e4 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139627616)
   * cc29fb3058998b1489451d53ad04d9e9886f4193 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139635222)
   * 3d315606bd4fdf826ef823631116a189810dab6a : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139641767)
   * 37bfdff9aa766deaac10d9c5407cc244d14d1085 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139643868)
   * daa0ff528a8ec3c2dcbd33927099cee2d8faa113 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139650255)
   * 17ba1baf2383844c1b0ca8aa8ca97a80646e77fb : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
wangyang0918 commented on a change in pull request #9984: 
[FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354693688
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Kubernetes specific implementation of the {@link ResourceManager}.
+ */
+public class KubernetesResourceManager extends 
ActiveResourceManager
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesResourceManager.class);
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final Map workerNodes = new 
HashMap<>();
+
+   private final double defaultCpus;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private final AtomicLong currentMaxAttemptId = new AtomicLong(0);
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private final AtomicLong currentMaxPodId = new AtomicLong(0);
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   private final List taskManagerStartCommand;
+
+   /** The number of pods requested, but not yet granted. */
+   private int numPendingPodRequests = 0;
+
+   public KubernetesResourceManager(
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   Configuration flinkConfig,
+ 

[GitHub] [flink] flinkbot commented on issue #10455: [FLINK-15089][connectors] Puslar catalog

2019-12-05 Thread GitBox
flinkbot commented on issue #10455: [FLINK-15089][connectors] Puslar catalog
URL: https://github.com/apache/flink/pull/10455#issuecomment-562462991
 
 
   
   ## CI report:
   
   * 206bb09e47ff4f36514a93c96f7cadc30f62cd4b : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
wangyang0918 commented on a change in pull request #9984: 
[FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354693480
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Kubernetes specific implementation of the {@link ResourceManager}.
+ */
+public class KubernetesResourceManager extends 
ActiveResourceManager
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesResourceManager.class);
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final Map workerNodes = new 
HashMap<>();
+
+   private final double defaultCpus;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private final AtomicLong currentMaxAttemptId = new AtomicLong(0);
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private final AtomicLong currentMaxPodId = new AtomicLong(0);
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   private final List taskManagerStartCommand;
+
+   /** The number of pods requested, but not yet granted. */
+   private int numPendingPodRequests = 0;
+
+   public KubernetesResourceManager(
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   Configuration flinkConfig,
+ 

[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
wangyang0918 commented on a change in pull request #9984: 
[FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354692195
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Kubernetes specific implementation of the {@link ResourceManager}.
+ */
+public class KubernetesResourceManager extends 
ActiveResourceManager
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesResourceManager.class);
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final Map workerNodes = new 
HashMap<>();
+
+   private final double defaultCpus;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private final AtomicLong currentMaxAttemptId = new AtomicLong(0);
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private final AtomicLong currentMaxPodId = new AtomicLong(0);
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   private final List taskManagerStartCommand;
+
+   /** The number of pods requested, but not yet granted. */
+   private int numPendingPodRequests = 0;
+
+   public KubernetesResourceManager(
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   Configuration flinkConfig,
+ 

[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
wangyang0918 commented on a change in pull request #9984: 
[FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354692195
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Kubernetes specific implementation of the {@link ResourceManager}.
+ */
+public class KubernetesResourceManager extends 
ActiveResourceManager
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesResourceManager.class);
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final Map workerNodes = new 
HashMap<>();
+
+   private final double defaultCpus;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private final AtomicLong currentMaxAttemptId = new AtomicLong(0);
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private final AtomicLong currentMaxPodId = new AtomicLong(0);
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   private final List taskManagerStartCommand;
+
+   /** The number of pods requested, but not yet granted. */
+   private int numPendingPodRequests = 0;
+
+   public KubernetesResourceManager(
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   Configuration flinkConfig,
+ 

[GitHub] [flink] flinkbot edited a comment on issue #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #10403: [FLINK-14645][table] Support to keep 
nullability and precision when converting DataTypes to properties
URL: https://github.com/apache/flink/pull/10403#issuecomment-561208418
 
 
   
   ## CI report:
   
   * 038052c833940fbaef3111ed92525216f13edee2 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139161968)
   * b0cfd6fc06fb9c581fe74f762654cbb1355ff4a8 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139259047)
   * a7adb1327e1705666ae0614e1e4361d05256633d : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139263403)
   * 73d285470f1bc8475e4563fd23c307b0694dd35e : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139278723)
   * 6be2267f8937d80a0e4bb5db664bf56169089a1f : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139296259)
   * 4be2bda46373f12c754183c67ad192b2715c37dd : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139339756)
   * c5d81539a92ad9d04e03c3ded1c4dcfa3d1b2bd0 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139446926)
   * e1aefe79e1afe03398917b8c8bd8b95facab : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139562400)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10389: [FLINK-14198][python] Add "type" and "rtype" options to flink python API docstrings of table.py and table_environment.py

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #10389: [FLINK-14198][python] Add "type" and 
"rtype" options to flink python API docstrings of table.py and 
table_environment.py
URL: https://github.com/apache/flink/pull/10389#issuecomment-560984209
 
 
   
   ## CI report:
   
   * ebc6a4396f71fda053e32ed71994e5ce129835c1 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139081057)
   * 1a14a71c2785b0c98831779b1e058f585bdc762c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139501852)
   * b1b324f72f4561126eb1dcbf144947d2b3ef2dc1 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354691495
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link KubernetesResourceManager}.
+ */
+public class KubernetesResourceManagerTest extends KubernetesTestBase {
+
+   private static final Time TIMEOUT = Time.seconds(10L);
+
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+
+   private final String jobManagerHost = "jm-host1";
+
+   private Configuration flinkConfig;
+
+   private TestingKubernetesResourceManager resourceManager;
+
+   private FlinkK

[GitHub] [flink] wuchong commented on a change in pull request #10403: [FLINK-14645][table] Support to keep nullability and precision when converting DataTypes to properties

2019-12-05 Thread GitBox
wuchong commented on a change in pull request #10403: [FLINK-14645][table] 
Support to keep nullability and precision when converting DataTypes to 
properties
URL: https://github.com/apache/flink/pull/10403#discussion_r354691466
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/descriptors/OldCsv.java
 ##
 @@ -123,17 +142,41 @@ public OldCsv field(String fieldName, TypeInformation 
fieldType) {
 * This method can be called multiple times. The call order of this 
method defines
 * also the order of the fields in the format.
 *
+* NOTE: the fieldType string should follow the type string defined 
in {@link LogicalTypeParser}.
+* This method also keeps compatible with old type string defined in 
{@link TypeStringUtils} but
+* will be dropped in future versions as it uses the old type system.
+*
 * @param fieldName the field name
 * @param fieldType the type string of the field
 */
public OldCsv field(String fieldName, String fieldType) {
+   if (isLegacyTypeString(fieldType)) {
+   // fallback to legacy parser
+   TypeInformation typeInfo = 
TypeStringUtils.readTypeInfo(fieldType);
+   return field(fieldName, 
TypeConversions.fromLegacyInfoToDataType(typeInfo));
+   } else {
+   return addField(fieldName, fieldType);
+   }
+   }
+
+   private OldCsv addField(String fieldName, String fieldType) {
if (schema.containsKey(fieldName)) {
throw new ValidationException("Duplicate field name " + 
fieldName + ".");
}
schema.put(fieldName, fieldType);
return this;
}
 
+   private boolean isLegacyTypeString(String fieldType) {
+   try {
+   LogicalTypeParser.parse(fieldType);
 
 Review comment:
   Sorry, I updated `Schema` but forgot to update `OldCsv`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wsry commented on a change in pull request #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle.

2019-12-05 Thread GitBox
wsry commented on a change in pull request #10375: [FLINK-14845][runtime] 
Introduce data compression to reduce disk and network IO of shuffle.
URL: https://github.com/apache/flink/pull/10375#discussion_r354691010
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 ##
 @@ -54,6 +54,28 @@
.withDescription("Enable SSL support for the 
taskmanager data transport. This is applicable only when the" +
" global flag for internal SSL (" + 
SecurityOptions.SSL_INTERNAL_ENABLED.key() + ") is set to true");
 
+   /**
+* Boolean flag indicating whether the shuffle data will be compressed 
or not.
+*
+* Note: Data is compressed per buffer (may be sliced buffer in 
pipeline mode) and compression can incur extra
+* CPU overhead so it is more effective for IO bounded scenario when 
data compression ratio is high.
+*/
+   public static final ConfigOption DATA_COMPRESSION_ENABLED =
+   key("taskmanager.data.compression.enabled")
 
 Review comment:
   Thanks for the comments. IMO, configuring separately for blocking and 
pipeline mode is better. I will adopt the proposal. 
   Maybe in the future, we can make some of the network config options 
configurable per job. For example, by passing the job level configuration to 
the create method and overriding the default cluster configuration when 
initializing InputGate and ResultPartition in Task.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354690468
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link KubernetesResourceManager}.
+ */
+public class KubernetesResourceManagerTest extends KubernetesTestBase {
+
+   private static final Time TIMEOUT = Time.seconds(10L);
+
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+
+   private final String jobManagerHost = "jm-host1";
+
+   private Configuration flinkConfig;
+
+   private TestingKubernetesResourceManager resourceManager;
+
+   private FlinkK

[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
wangyang0918 commented on a change in pull request #9984: 
[FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354690623
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManager.java
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Base class for {@link ResourceManager} implementations which contains some 
common variables and methods.
+ */
+public abstract class ActiveResourceManager 
 
 Review comment:
   I think active means the `ResourceManager` actively and dynamically 
allocates resource from the resource management cluster. I think 
`ActiveResourceManagerFactory` has existed for the same reason. If you insist, 
i could rename `ActiveResourceManager` to `ClusterResourceManager`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10246: [FLINK-10937][dist] Add kubernetes-entry.sh and kubernetes-session.sh.

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #10246: [FLINK-10937][dist] Add 
kubernetes-entry.sh and kubernetes-session.sh.
URL: https://github.com/apache/flink/pull/10246#issuecomment-555375256
 
 
   
   ## CI report:
   
   * 9f06575f29a7cb1102357c8919b57b3471dc80fd : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/137142048)
   * ffd0fdc2709bd21ee9574be845fc4a24d938ca45 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138017483)
   * 24406e36296a0bf1c9b057811e54a03fb6abab9b : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354690086
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link KubernetesResourceManager}.
+ */
+public class KubernetesResourceManagerTest extends KubernetesTestBase {
+
+   private static final Time TIMEOUT = Time.seconds(10L);
+
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+
+   private final String jobManagerHost = "jm-host1";
+
+   private Configuration flinkConfig;
+
+   private TestingKubernetesResourceManager resourceManager;
+
+   private FlinkK

[GitHub] [flink] wangyang0918 commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
wangyang0918 commented on a change in pull request #9984: 
[FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354689980
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManager.java
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Base class for {@link ResourceManager} implementations which contains some 
common variables and methods.
+ */
+public abstract class ActiveResourceManager 
+   extends ResourceManager {
+
+   /** The process environment variables. */
+   protected final Map env;
+
+   protected final int numSlotsPerTaskManager;
+
+   protected final TaskExecutorResourceSpec taskExecutorResourceSpec;
+
+   protected final int defaultMemoryMB;
+
+   protected final Collection resourceProfilesPerWorker;
+
+   /**
+* The updated Flink configuration. The client uploaded configuration 
may be updated before passed on to
+* {@link ResourceManager}. For example, {@link 
TaskManagerOptions#MANAGED_MEMORY_SIZE}.
+*/
+   protected final Configuration flinkConfig;
+
+   /** Flink configuration uploaded by client. */
+   protected final Configuration flinkClientConfig;
+
+   public ActiveResourceManager(
+   Configuration flinkConfig,
+   Map env,
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   HighAvailabilityServices highAvailabilityServices,
+   HeartbeatServices heartbeatServices,
+   SlotManager slotManager,
+   JobLeaderIdService jobLeaderIdService,
+   ClusterInformation clusterInformation,
+   FatalErrorHandler fatalErrorHandler,
+   ResourceManagerMetricGroup resourceManagerMetricGroup) {
+   super(
+   rpcService,
+   resourceManagerEndpointId,
+   resourceId,
+   highAvailabilityServices,
+   heartbeatServices,
+   slotManager,
+   jobLeaderIdService,
+   clusterInformation,
+   fatalErrorHandler,
+   resourceManagerMetricGroup);
+
+   this.flinkConfig = flinkConfig;
+   this.env = env;
+
+   this.numSlotsPerTaskManager = 
flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
+   this.taskExecutorResourceSpec = 
TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfig);
+   this.defaultMemoryMB = 
taskExecutorResourceSpec.getTotalProcessMemorySize().getMebiBytes();
+
+   this.resourceProfilesPerWorker = 
createWorkerSlotProfiles(fli

[jira] [Created] (FLINK-15091) JoinITCase.testFullJoinWithNonEquiJoinPred failed in travis

2019-12-05 Thread Kurt Young (Jira)
Kurt Young created FLINK-15091:
--

 Summary: JoinITCase.testFullJoinWithNonEquiJoinPred failed in 
travis
 Key: FLINK-15091
 URL: https://issues.apache.org/jira/browse/FLINK-15091
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: Kurt Young
Assignee: Jingsong Lee
 Fix For: 1.10.0


04:45:22.404 [ERROR] Tests run: 21, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 4.909 s <<< FAILURE! - in 
org.apache.flink.table.planner.runtime.batch.table.JoinITCase 04:45:22.406 
[ERROR] 
testFullJoinWithNonEquiJoinPred(org.apache.flink.table.planner.runtime.batch.table.JoinITCase)
 Time elapsed: 0.168 s <<< ERROR! 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at 
org.apache.flink.table.planner.runtime.batch.table.JoinITCase.testFullJoinWithNonEquiJoinPred(JoinITCase.scala:344)
 Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy Caused by: 
org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 
32 pages. Only 0 pages are remaining.

 

details: [https://api.travis-ci.org/v3/job/621407747/log.txt]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangxlong commented on issue #10369: [FLINK-14893][flink-core]Try child classLoader when parent classLoader could not load in parentFirstPatterns

2019-12-05 Thread GitBox
wangxlong commented on issue #10369: [FLINK-14893][flink-core]Try child 
classLoader when parent classLoader could not load in parentFirstPatterns
URL: https://github.com/apache/flink/pull/10369#issuecomment-562458580
 
 
   > > > @wangxlong thanks for your contribution. Changes look good to me. It 
would be better if you can write a test for the specific case and I think it is 
somewhat necessary :-)
   > > 
   > > 
   > > Hi @TisonKun . I am sorry for replying late. I found 
ClassLoader#loadClass will invoke findClass when loadClass failed. So it is not 
necessary to invoke findClass by ourselves. What do you think of it?
   > 
   > OK I think it is the reason why we tend to add a test for bisect the 
behavior :-)
   > 
   > IIUC you mean that with or without this patch, classloading always works 
as expected, right?
   > 
   > If so, it'd better we just close this ticket.
   
   Yes, you are right. I will close this ticket. Thanks you for your review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354689510
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link KubernetesResourceManager}.
+ */
+public class KubernetesResourceManagerTest extends KubernetesTestBase {
+
+   private static final Time TIMEOUT = Time.seconds(10L);
+
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+
+   private final String jobManagerHost = "jm-host1";
+
+   private Configuration flinkConfig;
+
+   private TestingKubernetesResourceManager resourceManager;
+
+   private FlinkK

[GitHub] [flink] wangxlong closed pull request #10369: [FLINK-14893][flink-core]Try child classLoader when parent classLoader could not load in parentFirstPatterns

2019-12-05 Thread GitBox
wangxlong closed pull request #10369: [FLINK-14893][flink-core]Try child 
classLoader when parent classLoader could not load in parentFirstPatterns
URL: https://github.com/apache/flink/pull/10369
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-14552) Enable partition statistics in blink planner

2019-12-05 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-14552.
--
Fix Version/s: 1.10.0
   Resolution: Fixed

master: 0d6b43959bb6a576c95549e845f7db5e51229740

> Enable partition statistics in blink planner
> 
>
> Key: FLINK-14552
> URL: https://issues.apache.org/jira/browse/FLINK-14552
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> We need update statistics after partition pruning in 
> PushPartitionIntoTableSourceScanRule.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] WeiZhong94 commented on issue #10389: [FLINK-14198][python] Add "type" and "rtype" options to flink python API docstrings of table.py and table_environment.py

2019-12-05 Thread GitBox
WeiZhong94 commented on issue #10389: [FLINK-14198][python] Add "type" and 
"rtype" options to flink python API docstrings of table.py and 
table_environment.py
URL: https://github.com/apache/flink/pull/10389#issuecomment-562457573
 
 
   @sunjincheng121 Thanks for your review! I have addressed your comments in 
the latest commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354687863
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link KubernetesResourceManager}.
+ */
+public class KubernetesResourceManagerTest extends KubernetesTestBase {
+
+   private static final Time TIMEOUT = Time.seconds(10L);
+
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+
+   private final String jobManagerHost = "jm-host1";
+
+   private Configuration flinkConfig;
+
+   private TestingKubernetesResourceManager resourceManager;
+
+   private FlinkK

[GitHub] [flink] flinkbot commented on issue #10455: [FLINK-15089][connectors] Puslar catalog

2019-12-05 Thread GitBox
flinkbot commented on issue #10455: [FLINK-15089][connectors] Puslar catalog
URL: https://github.com/apache/flink/pull/10455#issuecomment-562457258
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 206bb09e47ff4f36514a93c96f7cadc30f62cd4b (Fri Dec 06 
06:58:15 UTC 2019)
   
   **Warnings:**
* **2 pom.xml files were touched**: Check for build and licensing issues.
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-15089).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung merged pull request #10315: [FLINK-14552][table] Enable partition statistics in blink planner

2019-12-05 Thread GitBox
KurtYoung merged pull request #10315: [FLINK-14552][table] Enable partition 
statistics in blink planner
URL: https://github.com/apache/flink/pull/10315
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-15089) Pulsar Catalog

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-15089:
---
Labels: pull-request-available  (was: )

> Pulsar Catalog
> --
>
> Key: FLINK-15089
> URL: https://issues.apache.org/jira/browse/FLINK-15089
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Common
>Reporter: Yijie Shen
>Priority: Major
>  Labels: pull-request-available
>
> Per discussion in the mailing list, A Pulsar Catalog implementation is made 
> to a single task. The design doc is: 
> [https://docs.google.com/document/d/1LMnABtXn-wQedsmWv8hopvx-B-jbdr8-jHbIiDhdsoE/edit?usp=sharing]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] yjshen opened a new pull request #10455: [FLINK-15089][connectors] Puslar catalog

2019-12-05 Thread GitBox
yjshen opened a new pull request #10455: [FLINK-15089][connectors] Puslar 
catalog
URL: https://github.com/apache/flink/pull/10455
 
 
   ## What is the purpose of the change
   
   This PR implements Pulsar catalog which is part of 
[FLIP-72](https://cwiki.apache.org/confluence/display/FLINK/FLIP-72%3A+Introduce+Pulsar+Connector).
   
   The Pulsar catalog supports the following features by extending 
`AbstractCatalog` API:
   - Open and close the catalog
   - Database operations: list, get, exists, create, drop (create and drop 
should first check if the user has sufficient permissions)
   - Table operations: list, get, exist, create, drop (create and drop should 
first check if the user has sufficient permissions)
   
   ## Brief change log
   
   - The main functionality of Catalog exists in `PulsarCatalog` and 
`PulsarMetadataReader`.
   - Pulsar Schema <-> Flink type conversion exists in `SchemaUtils`.
   
   ## Verifying this change
   
   This PR will be tested by unit tests and integration tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes) . Add Pulsar 
dependencies.
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? 
(https://docs.google.com/document/d/1LMnABtXn-wQedsmWv8hopvx-B-jbdr8-jHbIiDhdsoE/edit?usp=sharing)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-15090) Reverse the dependency from flink-streaming-java to flink-client

2019-12-05 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen updated FLINK-15090:
--
Description: 
After FLIP-73 the dependencies are minor. Tasks I can find are

1. Move {{StreamGraphTranslator}} to {{flink-client}}.
2. Implement similar context environment of streaming as batch env in 
{{flink-client}}. Set/Unset as context along with {{ExecutionEnvironment}}.

After this task we still have a dependency from {{flink-streaming-java}} to 
{{flink-java}} because of some input formats dependencies. We can break the 
dependencies as follow-ups.

cc [~aljoscha]

  was:
After FLIP-73 the dependencies are minor. Tasks I can find are

1. Move {{StreamGraphTranslator}} to {{flink-client}}.
2. Implement similar context environment of streaming as batch env in 
{{flink-client}}. Set/Unset as context along with {{ExecutionEnvironment}}.

After this task we still have a dependency from {{flink-streaming-java}} to 
{{flink-java}} because some input formats dependencies. We can break the 
dependencies as follow-ups.

cc [~aljoscha]


> Reverse the dependency from flink-streaming-java to flink-client
> 
>
> Key: FLINK-15090
> URL: https://issues.apache.org/jira/browse/FLINK-15090
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zili Chen
>Priority: Major
> Fix For: 1.11.0
>
>
> After FLIP-73 the dependencies are minor. Tasks I can find are
> 1. Move {{StreamGraphTranslator}} to {{flink-client}}.
> 2. Implement similar context environment of streaming as batch env in 
> {{flink-client}}. Set/Unset as context along with {{ExecutionEnvironment}}.
> After this task we still have a dependency from {{flink-streaming-java}} to 
> {{flink-java}} because of some input formats dependencies. We can break the 
> dependencies as follow-ups.
> cc [~aljoscha]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354687863
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerStateBuilder;
+import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.PodStatusBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link KubernetesResourceManager}.
+ */
+public class KubernetesResourceManagerTest extends KubernetesTestBase {
+
+   private static final Time TIMEOUT = Time.seconds(10L);
+
+   private TestingFatalErrorHandler testingFatalErrorHandler;
+
+   private final String jobManagerHost = "jm-host1";
+
+   private Configuration flinkConfig;
+
+   private TestingKubernetesResourceManager resourceManager;
+
+   private FlinkK

[jira] [Updated] (FLINK-15090) Reverse the dependency from flink-streaming-java to flink-client

2019-12-05 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen updated FLINK-15090:
--
Description: 
After FLIP-73 the dependencies are minor. Tasks I can find are

1. Move {{StreamGraphTranslator}} to {{flink-client}}.
2. Implement similar context environment of streaming as batch env in 
{{flink-client}}. Set/Unset as context along with {{ExecutionEnvironment}}.

After this task we still have a dependency from {{flink-streaming-java}} to 
{{flink-java}} because some input formats dependencies. We can break the 
dependencies as follow-ups.

cc [~aljoscha]

  was:
After FLIP-73 the dependencies are minor. Tasks I can find are

1. Move {{StreamGraphTranslator}} to {{flink-client}}.
2. Implement similar context environment of streaming as {{flink-java}}. 
Set/Unset as context along with {{ExecutionEnvironment}}.

After this task we still have a dependency from {{flink-streaming-java}} to 
{{flink-java}} because some input formats dependencies. We can break the 
dependencies as follow-ups.

cc [~aljoscha]


> Reverse the dependency from flink-streaming-java to flink-client
> 
>
> Key: FLINK-15090
> URL: https://issues.apache.org/jira/browse/FLINK-15090
> Project: Flink
>  Issue Type: Improvement
>Reporter: Zili Chen
>Priority: Major
> Fix For: 1.11.0
>
>
> After FLIP-73 the dependencies are minor. Tasks I can find are
> 1. Move {{StreamGraphTranslator}} to {{flink-client}}.
> 2. Implement similar context environment of streaming as batch env in 
> {{flink-client}}. Set/Unset as context along with {{ExecutionEnvironment}}.
> After this task we still have a dependency from {{flink-streaming-java}} to 
> {{flink-java}} because some input formats dependencies. We can break the 
> dependencies as follow-ups.
> cc [~aljoscha]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15090) Reverse the dependency from flink-streaming-java to flink-client

2019-12-05 Thread Zili Chen (Jira)
Zili Chen created FLINK-15090:
-

 Summary: Reverse the dependency from flink-streaming-java to 
flink-client
 Key: FLINK-15090
 URL: https://issues.apache.org/jira/browse/FLINK-15090
 Project: Flink
  Issue Type: Improvement
Reporter: Zili Chen
 Fix For: 1.11.0


After FLIP-73 the dependencies are minor. Tasks I can find are

1. Move {{StreamGraphTranslator}} to {{flink-client}}.
2. Implement similar context environment of streaming as {{flink-java}}. 
Set/Unset as context along with {{ExecutionEnvironment}}.

After this task we still have a dependency from {{flink-streaming-java}} to 
{{flink-java}} because some input formats dependencies. We can break the 
dependencies as follow-ups.

cc [~aljoscha]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354682702
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Kubernetes specific implementation of the {@link ResourceManager}.
+ */
+public class KubernetesResourceManager extends 
ActiveResourceManager
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesResourceManager.class);
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final Map workerNodes = new 
HashMap<>();
+
+   private final double defaultCpus;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private final AtomicLong currentMaxAttemptId = new AtomicLong(0);
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private final AtomicLong currentMaxPodId = new AtomicLong(0);
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   private final List taskManagerStartCommand;
+
+   /** The number of pods requested, but not yet granted. */
+   private int numPendingPodRequests = 0;
+
+   public KubernetesResourceManager(
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   Configuration flinkConfig,
+

[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354682441
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Kubernetes specific implementation of the {@link ResourceManager}.
+ */
+public class KubernetesResourceManager extends 
ActiveResourceManager
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesResourceManager.class);
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final Map workerNodes = new 
HashMap<>();
+
+   private final double defaultCpus;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private final AtomicLong currentMaxAttemptId = new AtomicLong(0);
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private final AtomicLong currentMaxPodId = new AtomicLong(0);
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   private final List taskManagerStartCommand;
+
+   /** The number of pods requested, but not yet granted. */
+   private int numPendingPodRequests = 0;
+
+   public KubernetesResourceManager(
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   Configuration flinkConfig,
+

[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354680696
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManager.java
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Base class for {@link ResourceManager} implementations which contains some 
common variables and methods.
+ */
+public abstract class ActiveResourceManager 
 
 Review comment:
   TBH I think there should be another better semantic decorator instead of 
`Active` to be distinguished with `StandaloneResourceManager`. Actually we want 
to describe this resource manager based on resource framework.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354685294
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Kubernetes specific implementation of the {@link ResourceManager}.
+ */
+public class KubernetesResourceManager extends 
ActiveResourceManager
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesResourceManager.class);
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final Map workerNodes = new 
HashMap<>();
+
+   private final double defaultCpus;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private final AtomicLong currentMaxAttemptId = new AtomicLong(0);
 
 Review comment:
   If there is no current issues for this variable, we can use the simple 
`long` type instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354672522
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManager.java
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Base class for {@link ResourceManager} implementations which contains some 
common variables and methods.
+ */
+public abstract class ActiveResourceManager 
+   extends ResourceManager {
+
+   /** The process environment variables. */
+   protected final Map env;
+
+   protected final int numSlotsPerTaskManager;
+
+   protected final TaskExecutorResourceSpec taskExecutorResourceSpec;
+
+   protected final int defaultMemoryMB;
+
+   protected final Collection resourceProfilesPerWorker;
+
+   /**
+* The updated Flink configuration. The client uploaded configuration 
may be updated before passed on to
+* {@link ResourceManager}. For example, {@link 
TaskManagerOptions#MANAGED_MEMORY_SIZE}.
+*/
+   protected final Configuration flinkConfig;
+
+   /** Flink configuration uploaded by client. */
+   protected final Configuration flinkClientConfig;
+
+   public ActiveResourceManager(
+   Configuration flinkConfig,
+   Map env,
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   HighAvailabilityServices highAvailabilityServices,
+   HeartbeatServices heartbeatServices,
+   SlotManager slotManager,
+   JobLeaderIdService jobLeaderIdService,
+   ClusterInformation clusterInformation,
+   FatalErrorHandler fatalErrorHandler,
+   ResourceManagerMetricGroup resourceManagerMetricGroup) {
+   super(
+   rpcService,
+   resourceManagerEndpointId,
+   resourceId,
+   highAvailabilityServices,
+   heartbeatServices,
+   slotManager,
+   jobLeaderIdService,
+   clusterInformation,
+   fatalErrorHandler,
+   resourceManagerMetricGroup);
+
+   this.flinkConfig = flinkConfig;
+   this.env = env;
+
+   this.numSlotsPerTaskManager = 
flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
+   this.taskExecutorResourceSpec = 
TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfig);
+   this.defaultMemoryMB = 
taskExecutorResourceSpec.getTotalProcessMemorySize().getMebiBytes();
+
+   this.resourceProfilesPerWorker = 
createWorkerSlotProfiles(flinkC

[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354685039
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Kubernetes specific implementation of the {@link ResourceManager}.
+ */
+public class KubernetesResourceManager extends 
ActiveResourceManager
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesResourceManager.class);
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final Map workerNodes = new 
HashMap<>();
+
+   private final double defaultCpus;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private final AtomicLong currentMaxAttemptId = new AtomicLong(0);
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private final AtomicLong currentMaxPodId = new AtomicLong(0);
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   private final List taskManagerStartCommand;
+
+   /** The number of pods requested, but not yet granted. */
+   private int numPendingPodRequests = 0;
+
+   public KubernetesResourceManager(
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   Configuration flinkConfig,
+

[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354682932
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Kubernetes specific implementation of the {@link ResourceManager}.
+ */
+public class KubernetesResourceManager extends 
ActiveResourceManager
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesResourceManager.class);
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final Map workerNodes = new 
HashMap<>();
+
+   private final double defaultCpus;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private final AtomicLong currentMaxAttemptId = new AtomicLong(0);
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private final AtomicLong currentMaxPodId = new AtomicLong(0);
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   private final List taskManagerStartCommand;
+
+   /** The number of pods requested, but not yet granted. */
+   private int numPendingPodRequests = 0;
+
+   public KubernetesResourceManager(
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   Configuration flinkConfig,
+

[GitHub] [flink] zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
zhijiangW commented on a change in pull request #9984: [FLINK-9495][kubernetes] 
Implement ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#discussion_r354685940
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.kubernetes.kubeclient.TaskManagerPodParameter;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
+import org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.entrypoint.parser.CommandLineOptions;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ActiveResourceManager;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Kubernetes specific implementation of the {@link ResourceManager}.
+ */
+public class KubernetesResourceManager extends 
ActiveResourceManager
+   implements FlinkKubeClient.PodCallbackHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesResourceManager.class);
+
+   /** The taskmanager pod name pattern is 
{clusterId}-{taskmanager}-{attemptId}-{podIndex}. */
+   private static final String TASK_MANAGER_POD_FORMAT = 
"%s-taskmanager-%d-%d";
+
+   private final Map workerNodes = new 
HashMap<>();
+
+   private final double defaultCpus;
+
+   /** When ResourceManager failover, the max attempt should recover. */
+   private final AtomicLong currentMaxAttemptId = new AtomicLong(0);
+
+   /** Current max pod index. When creating a new pod, it should increase 
one. */
+   private final AtomicLong currentMaxPodId = new AtomicLong(0);
+
+   private final String clusterId;
+
+   private final FlinkKubeClient kubeClient;
+
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   private final List taskManagerStartCommand;
+
+   /** The number of pods requested, but not yet granted. */
+   private int numPendingPodRequests = 0;
+
+   public KubernetesResourceManager(
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   Configuration flinkConfig,
+

[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient

2019-12-05 Thread GitBox
KurtYoung commented on a change in pull request #10454: 
[FLINK-13195][sql-client] Add create(drop) table support for SqlClient
URL: https://github.com/apache/flink/pull/10454#discussion_r354682103
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 ##
 @@ -534,6 +540,26 @@ private boolean callInsertInto(SqlCommandCall cmdCall) {
return true;
}
 
+   private void callCreateTable(SqlCommandCall cmdCall) {
+   try {
+   executor.createTable(sessionId, cmdCall.operands[0]);
+   printInfo(CliStrings.MESSAGE_TABLE_CREATED);
+   } catch (SqlExecutionException e) {
+   printExecutionException(e);
+   return;
+   }
+   terminal.flush();
 
 Review comment:
   why flush terminal here but not in drop table?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient

2019-12-05 Thread GitBox
KurtYoung commented on a change in pull request #10454: 
[FLINK-13195][sql-client] Add create(drop) table support for SqlClient
URL: https://github.com/apache/flink/pull/10454#discussion_r354685477
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ##
 @@ -154,8 +160,29 @@ public ExecutionContext(
ClusterClientServiceLoader clusterClientServiceLoader,
Options commandLineOptions,
List availableCommandLines) throws 
FlinkException {
+   this(
+   environment,
+   originalSessionContext,
+   null,
+   dependencies,
+   flinkConfig,
+   new DefaultClusterClientServiceLoader(),
+   commandLineOptions,
+   availableCommandLines);
+   }
+
+   public ExecutionContext(
 
 Review comment:
   make all construction `private` if you want to introduce `Builder`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient

2019-12-05 Thread GitBox
KurtYoung commented on a change in pull request #10454: 
[FLINK-13195][sql-client] Add create(drop) table support for SqlClient
URL: https://github.com/apache/flink/pull/10454#discussion_r354684030
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ##
 @@ -428,6 +485,12 @@ private void initializeTableEnvironment() {
modules.forEach(tableEnv::loadModule);
}
 
+   if (catalogManager != null) {
+   registerFunctions();
 
 Review comment:
   why register function if we already have the catalog manager?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient

2019-12-05 Thread GitBox
KurtYoung commented on a change in pull request #10454: 
[FLINK-13195][sql-client] Add create(drop) table support for SqlClient
URL: https://github.com/apache/flink/pull/10454#discussion_r354684496
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ##
 @@ -584,4 +655,69 @@ private Pipeline createPipeline(String name) {
return execEnv.createProgramPlan(name);
}
}
+
+   //~ Inner Class 
---
+
+   /** Builder for {@link ExecutionContext}. */
+   public static class Builder {
+   // Required members.
+   private final SessionContext sessionContext;
+   private final List dependencies;
+   private final Configuration configuration;
+   private final ClusterClientServiceLoader serviceLoader;
+   private final Options commandLineOptions;
+   private final List commandLines;
+
+   private Environment defaultEnv;
+   private Environment env;
+
+   // Optional members.
 
 Review comment:
   use Nullable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient

2019-12-05 Thread GitBox
KurtYoung commented on a change in pull request #10454: 
[FLINK-13195][sql-client] Add create(drop) table support for SqlClient
URL: https://github.com/apache/flink/pull/10454#discussion_r354684576
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ##
 @@ -584,4 +655,69 @@ private Pipeline createPipeline(String name) {
return execEnv.createProgramPlan(name);
}
}
+
+   //~ Inner Class 
---
+
+   /** Builder for {@link ExecutionContext}. */
+   public static class Builder {
+   // Required members.
+   private final SessionContext sessionContext;
+   private final List dependencies;
+   private final Configuration configuration;
+   private final ClusterClientServiceLoader serviceLoader;
+   private final Options commandLineOptions;
+   private final List commandLines;
+
+   private Environment defaultEnv;
+   private Environment env;
 
 Review comment:
   use another meaningful name rather than `env`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient

2019-12-05 Thread GitBox
KurtYoung commented on a change in pull request #10454: 
[FLINK-13195][sql-client] Add create(drop) table support for SqlClient
URL: https://github.com/apache/flink/pull/10454#discussion_r354683491
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ##
 @@ -428,6 +485,12 @@ private void initializeTableEnvironment() {
modules.forEach(tableEnv::loadModule);
}
 
+   if (catalogManager != null) {
+   registerFunctions();
+   // No need to register the catalogs info if already 
inherit from the same session.
+   return;
 
 Review comment:
   step 7 can't be skipped?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-14952) Yarn containers can exceed physical memory limits when using BoundedBlockingSubpartition.

2019-12-05 Thread Yingjie Cao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989448#comment-16989448
 ] 

Yingjie Cao commented on FLINK-14952:
-

[~gjy] By "FILE-FILE" mode, I mean using BoundedBlockingSubpartitionType.FILE. 
As explained by [~pnowojski], results are written to a file and read from a 
file using read/write API of FileChannel. When reading from files, two buffers 
are needed by each subpartition to store the read data. For MMAP read mode, the 
extra read buffers are not needed.

> Yarn containers can exceed physical memory limits when using 
> BoundedBlockingSubpartition.
> -
>
> Key: FLINK-14952
> URL: https://issues.apache.org/jira/browse/FLINK-14952
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Network
>Affects Versions: 1.9.1
>Reporter: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.10.0
>
>
> As [reported by a user on the user mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CoGroup-SortMerger-performance-degradation-from-1-6-4-1-9-1-td31082.html],
>  combination of using {{BoundedBlockingSubpartition}} with yarn containers 
> can cause yarn container to exceed memory limits.
> {quote}2019-11-19 12:49:23,068 INFO org.apache.flink.yarn.YarnResourceManager 
> - Closing TaskExecutor connection container_e42_1574076744505_9444_01_04 
> because: Container 
> [pid=42774,containerID=container_e42_1574076744505_9444_01_04] is running 
> beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical 
> memory used; 13.9 GB of 25.2 GB virtual memory used. Killing container.
> {quote}
> This is probably happening because memory usage of mmap is not capped and not 
> accounted by configured memory limits, however yarn is tracking this memory 
> usage and once Flink exceeds some threshold, container is being killed.
> Workaround is to overrule default value and force Flink to not user mmap, by 
> setting a secret (🤫) config option:
> {noformat}
> taskmanager.network.bounded-blocking-subpartition-type: file
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient

2019-12-05 Thread GitBox
KurtYoung commented on a change in pull request #10454: 
[FLINK-13195][sql-client] Add create(drop) table support for SqlClient
URL: https://github.com/apache/flink/pull/10454#discussion_r354684829
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 ##
 @@ -198,37 +198,24 @@ public void start() {
// nothing to do yet
}
 
-   /**
-* Create a new {@link ExecutionContext} by merging the default 
environment the the environment in session context.
-*/
-   private ExecutionContext createExecutionContext(SessionContext 
sessionContext) {
-   Environment mergedEnv = Environment.merge(defaultEnvironment, 
sessionContext.getSessionEnv());
-   return createExecutionContext(mergedEnv, sessionContext);
-   }
-
-   /**
-* Create a new {@link ExecutionContext} by using the given environment.
-*/
-   private ExecutionContext createExecutionContext(Environment 
environment, SessionContext sessionContext) {
-   try {
-   return new ExecutionContext<>(
-   environment,
-   sessionContext,
-   dependencies,
-   flinkConfig,
-   clusterClientServiceLoader,
-   commandLineOptions,
-   commandLines);
-   } catch (Throwable t) {
-   // catch everything such that a configuration does not 
crash the executor
-   throw new SqlExecutionException("Could not create 
execution context.", t);
-   }
+   /** Returns ExecutionContext.Builder with given {@link SessionContext} 
session context. */
+   private ExecutionContext.Builder execContextBuilder(SessionContext 
sessionContext) {
 
 Review comment:
   createExecutionContextBuilder?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #10454: [FLINK-13195][sql-client] Add create(drop) table support for SqlClient

2019-12-05 Thread GitBox
KurtYoung commented on a change in pull request #10454: 
[FLINK-13195][sql-client] Add create(drop) table support for SqlClient
URL: https://github.com/apache/flink/pull/10454#discussion_r354683869
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ##
 @@ -428,6 +485,12 @@ private void initializeTableEnvironment() {
modules.forEach(tableEnv::loadModule);
}
 
+   if (catalogManager != null) {
 
 Review comment:
   I think you can split the logic into 2 groups:
   1. initializing this with previous catalog manager
   2. initializing this without previous catalog manager
   
   and do a if-else here. Step 3-6 can be extracted to a new init catalog method


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement ResourceManager for Kubernetes.

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #9984: [FLINK-9495][kubernetes] Implement 
ResourceManager for Kubernetes.
URL: https://github.com/apache/flink/pull/9984#issuecomment-545881191
 
 
   
   ## CI report:
   
   * 802ebf37e3d932169f9826b40df483bb5e9ac064 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133366796)
   * f16938ce2fb38ae216def737d14643b94d6083a1 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/133940607)
   * 20de5cfc7af9a8ba57080d5218fd0293f393a40e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/133998545)
   * 56bfbb65802c1d5c48caa625a152070934bb5d79 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/134010321)
   * 3ea229382fef64b1046673c79ff845d4689c5db4 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/137145261)
   * 3169988a33e0126e79cd449740c93d3561296ead : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138017410)
   * a0dd858b0b91443fc87895a2d32ebfbbc0b9fe4c : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138730857)
   * 86b4537979265a0fbecf7c1841ed8fd2f7ebfd86 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138770610)
   * d77f83e133b497e42cd85aeaf95e625411274c92 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139116897)
   * f2ed0e4b8d37dc3d5b2b770d943e703f4d893da0 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139256990)
   * ce01f742aba3a683b98a0ea7da47e678d30c12be : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139290769)
   * 7392ac618203c2544ab4a313a7068e03fcc147a7 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139453237)
   * 3b3e23e69bde65f504ea5d46ef7c2d55ba04c0af : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139459406)
   * 71c087913517972655181e0119588c886e4243cb : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139461705)
   * 9be9f9f2338a2a953a76cc60057554d7d90157f9 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139518666)
   * bb01ae34a8d6087f4aef4a989c0d14d576df601f : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139523468)
   * fd9cdf56812f2d0c9d659b69d9fb243d58ef48a3 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139555865)
   * c647669902f574c3ee23adc7efbabc859d93c0e4 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139627616)
   * cc29fb3058998b1489451d53ad04d9e9886f4193 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139635222)
   * 3d315606bd4fdf826ef823631116a189810dab6a : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139641767)
   * 37bfdff9aa766deaac10d9c5407cc244d14d1085 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139643868)
   * daa0ff528a8ec3c2dcbd33927099cee2d8faa113 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10453: [FLINK-14026][python] Manage the resource of Python worker properly

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #10453: [FLINK-14026][python] Manage the 
resource of Python worker properly
URL: https://github.com/apache/flink/pull/10453#issuecomment-562427019
 
 
   
   ## CI report:
   
   * 3204767d620e3f8469819ef3155a6cc559745860 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139641761)
   * 19e8894ca5fc0f90e02694e0400b4e67adbbb385 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139643834)
   * 3650c724e5c3ffe5734569c5e31bfe049c04a6de : UNKNOWN
   * 93899035a1541efd6a44b3ff85bad868bb87f903 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/139645881)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-15089) Pulsar Catalog

2019-12-05 Thread Yijie Shen (Jira)
Yijie Shen created FLINK-15089:
--

 Summary: Pulsar Catalog
 Key: FLINK-15089
 URL: https://issues.apache.org/jira/browse/FLINK-15089
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common
Reporter: Yijie Shen


Per discussion in the mailing list, A Pulsar Catalog implementation is made to 
a single task. The design doc is: 
[https://docs.google.com/document/d/1LMnABtXn-wQedsmWv8hopvx-B-jbdr8-jHbIiDhdsoE/edit?usp=sharing]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #10375: [FLINK-14845][runtime] Introduce data compression to reduce disk and network IO of shuffle.

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #10375: [FLINK-14845][runtime] Introduce 
data compression to reduce disk and network IO of shuffle.
URL: https://github.com/apache/flink/pull/10375#issuecomment-560214201
 
 
   
   ## CI report:
   
   * af7ab17308bafebf22198881ad59ec4516da43e2 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138883739)
   * 02ec2e94d3a782c06b4c46c93445a098887b4e55 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/138891350)
   * a83e8f46acdc7e4a9e9b1c4f0de83758d4a4e77e : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138893241)
   * 1c62e5a4d0f73a0150104a5b1587a674e4369a2b : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/138941812)
   * fc33c72e50a2db55f9ce029d8af558ecedbcc39f : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139083473)
   * 2f538ab20262fbe9981d4851b93e9bee04fc6ad0 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139147407)
   * 915e45f5d61583e7578751944ac46766505a8c34 : UNKNOWN
   * 29936b5ac7e15863c1b109a17ac7cc07dd7cc225 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139541810)
   * 055230e7259e53159af46b11be308e88be935d57 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139555800)
   * 727261f2bd508985133bb86703727cd1eb93363d : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/139562347)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] cshuo commented on a change in pull request #10316: [FLINK-14624][table-blink] Support computed column as rowtime attribute

2019-12-05 Thread GitBox
cshuo commented on a change in pull request #10316: [FLINK-14624][table-blink] 
Support computed column as rowtime attribute
URL: https://github.com/apache/flink/pull/10316#discussion_r354680497
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/MiniBatchedWatermarkAssignerOperator.java
 ##
 @@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.operators.wmassigners;
-
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
-import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
-import org.apache.flink.table.dataformat.BaseRow;
-import org.apache.flink.table.runtime.generated.WatermarkGenerator;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-/**
- * A stream operator that extracts timestamps from stream elements and
- * generates watermarks with specified emit latency.
- *
- * The difference between this operator and {@link 
WatermarkAssignerOperator} is that:
- * 
- * This operator has an additional parameter {@link 
#minibatchInterval} which is
- * used to buffer watermarks and emit in an aligned interval with 
following window operators.
- * 
- */
-public class MiniBatchedWatermarkAssignerOperator
-   extends AbstractStreamOperator
-   implements OneInputStreamOperator, 
ProcessingTimeCallback {
-
-   private static final long serialVersionUID = 1L;
-
-   /** The field index of rowtime attribute. */
-   private final int rowtimeFieldIndex;
-
-   /** The watermark generator which generates watermark from the input 
row. */
-   private final WatermarkGenerator watermarkGenerator;
-
-   /** The idle timeout for how long it doesn't receive elements to mark 
this channel idle. */
-   private final long idleTimeout;
-
-   /** The event-time interval for emitting watermarks. */
-   private final long minibatchInterval;
-
-   /** Current watermark of this operator, but may not be emitted. */
-   private transient long currentWatermark;
-
-   /** The next watermark to be emitted. */
-   private transient long nextWatermark;
-
-   /** The processing time when the last record is processed. */
-   private transient long lastRecordTime;
-
-   /** Channel status maintainer which is used to inactive channel when 
channel is idle. */
-   private transient StreamStatusMaintainer streamStatusMaintainer;
-
-   /** Flag to prevent duplicate function.close() calls in close() and 
dispose(). */
-   private transient boolean functionsClosed = false;
-
-   public MiniBatchedWatermarkAssignerOperator(
-   int rowtimeFieldIndex,
-   WatermarkGenerator watermarkGenerator,
-   long idleTimeout,
-   long minibatchInterval) {
-   checkArgument(minibatchInterval > 0, "The inferred emit latency 
should be larger than 0");
-   this.rowtimeFieldIndex = rowtimeFieldIndex;
-   this.watermarkGenerator = watermarkGenerator;
-   this.idleTimeout = idleTimeout;
-   this.minibatchInterval = minibatchInterval;
-   this.chainingStrategy = ChainingStrategy.ALWAYS;
-   }
-
-   @Override
-   public void open() throws Exception {
-   super.open();
-
-   currentWatermark = 0;
-   nextWatermark = getMiniBatchStart(currentWatermark, 
minibatchInterval) + minibatchInterval - 1;
-
-   if (idleTimeout > 0) {
-   this.lastRecordTime = 
getProce

[GitHub] [flink] cshuo commented on a change in pull request #10316: [FLINK-14624][table-blink] Support computed column as rowtime attribute

2019-12-05 Thread GitBox
cshuo commented on a change in pull request #10316: [FLINK-14624][table-blink] 
Support computed column as rowtime attribute
URL: https://github.com/apache/flink/pull/10316#discussion_r354677891
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/MiniBatchIntervalInferRule.scala
 ##
 @@ -120,9 +116,22 @@ class MiniBatchIntervalInferRule extends RelOptRule(
 }
   }
 
-  private def isTableSourceScan(node: RelNode): Boolean = node match {
-case _: StreamExecDataStreamScan | _: StreamExecTableSourceScan => true
-case _ => false
+  private def shouldAppendMiniBatchAssignerNode(node: RelNode): Boolean = {
+val mode = node.getTraitSet
+  .getTrait(MiniBatchIntervalTraitDef.INSTANCE)
+  .getMiniBatchInterval
+  .mode
+node match {
+  case _: StreamExecDataStreamScan | _: StreamExecTableSourceScan =>
+// append minibatch node if the mode is not NONE and reach a source 
leaf node
+mode == MiniBatchMode.RowTime || mode == MiniBatchMode.ProcTime
+  case _: StreamExecWatermarkAssigner  =>
 
 Review comment:
   IIUC, for the case that there is only group aggregation with a **redundant 
watermark** defined in the sql job, minibatch will not worked even when the 
user  set `table.exec.mini-batch.enabled = true`, which confuses users as this 
is transparent to them. I'm wondering if some checks can be added to alert user?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #10452: [FLINK-15035][table-planner-blink] Introduce unknown memory setting to table in blink planner

2019-12-05 Thread GitBox
JingsongLi commented on a change in pull request #10452: 
[FLINK-15035][table-planner-blink] Introduce unknown memory setting to table in 
blink planner
URL: https://github.com/apache/flink/pull/10452#discussion_r354681473
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/BatchExecNode.scala
 ##
 @@ -32,4 +33,13 @@ trait BatchExecNode[T] extends ExecNode[BatchPlanner, T] 
with Logging {
 */
   def getDamBehavior: DamBehavior
 
+  def setManagedMemoryWeight[X](
+  transformation: Transformation[X], memoryBytes: Long): Transformation[X] 
= {
+// Using Bytes can easily overflow
+// Using MebiBytes to cast to int
+// Careful about zero
+val memoryMB = Math.max(1, (memoryBytes >> 20).toInt)
 
 Review comment:
   Yeah, It's a bit hacky, looks like we create these hacky code to work around 
runtime default value.
   Because our current `ManagedMemoryWeight` settings are very large. So I 
think default 1 is very small and is not a serious problem, so I prefer just 
check in `ExecutorUtils#setBatchProperties`, and warn log when weight is 
default 1.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10449: [FLINK-7289][state-backend] Integrate RocksDB memory use with managed memory

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #10449: [FLINK-7289][state-backend] 
Integrate RocksDB memory use with managed memory
URL: https://github.com/apache/flink/pull/10449#issuecomment-562311621
 
 
   
   ## CI report:
   
   * 354f5dcf865b2de3b8b426ae40f6633ff1755e4a : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/13956)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10453: [FLINK-14026][python] Manage the resource of Python worker properly

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #10453: [FLINK-14026][python] Manage the 
resource of Python worker properly
URL: https://github.com/apache/flink/pull/10453#issuecomment-562427019
 
 
   
   ## CI report:
   
   * 3204767d620e3f8469819ef3155a6cc559745860 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139641761)
   * 19e8894ca5fc0f90e02694e0400b4e67adbbb385 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/139643834)
   * 3650c724e5c3ffe5734569c5e31bfe049c04a6de : UNKNOWN
   * 93899035a1541efd6a44b3ff85bad868bb87f903 : UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #10445: [FLINK-14407][yarn] Retry S3 tests on failure

2019-12-05 Thread GitBox
flinkbot edited a comment on issue #10445: [FLINK-14407][yarn] Retry S3 tests 
on failure
URL: https://github.com/apache/flink/pull/10445#issuecomment-562177526
 
 
   
   ## CI report:
   
   * 9709c3ce629fb1ee84c180a92b9077dc56a18ab0 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/139548882)
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun edited a comment on issue #10369: [FLINK-14893][flink-core]Try child classLoader when parent classLoader could not load in parentFirstPatterns

2019-12-05 Thread GitBox
TisonKun edited a comment on issue #10369: [FLINK-14893][flink-core]Try child 
classLoader when parent classLoader could not load in parentFirstPatterns
URL: https://github.com/apache/flink/pull/10369#issuecomment-562445489
 
 
   > > @wangxlong thanks for your contribution. Changes look good to me. It 
would be better if you can write a test for the specific case and I think it is 
somewhat necessary :-)
   > 
   > Hi @TisonKun . I am sorry for replying late. I found ClassLoader#loadClass 
will invoke findClass when loadClass failed. So it is not necessary to invoke 
findClass by ourselves. What do you think of it?
   
   OK I think it is the reason why we tend to add a test for bisect the 
behavior :-)
   
   IIUC you mean that with or without this patch, classloading always works as 
expected, right?
   
   If so, it'd better we just close this ticket.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on issue #10369: [FLINK-14893][flink-core]Try child classLoader when parent classLoader could not load in parentFirstPatterns

2019-12-05 Thread GitBox
TisonKun commented on issue #10369: [FLINK-14893][flink-core]Try child 
classLoader when parent classLoader could not load in parentFirstPatterns
URL: https://github.com/apache/flink/pull/10369#issuecomment-562445489
 
 
   > > @wangxlong thanks for your contribution. Changes look good to me. It 
would be better if you can write a test for the specific case and I think it is 
somewhat necessary :-)
   > 
   > Hi @TisonKun . I am sorry for replying late. I found ClassLoader#loadClass 
will invoke findClass when loadClass failed. So it is not necessary to invoke 
findClass by ourselves. What do you think of it?
   
   OK I think it is the reason why we tend to add a test for bisect the 
behavior :-)
   
   IIUC you mean that with or without this patch, classloading always works as 
expected, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong edited a comment on issue #10146: [FLINK-14188][runtime] TaskExecutor derive and register with default slot resource profile

2019-12-05 Thread GitBox
xintongsong edited a comment on issue #10146: [FLINK-14188][runtime] 
TaskExecutor derive and register with default slot resource profile
URL: https://github.com/apache/flink/pull/10146#issuecomment-562425217
 
 
   PR updatesd.
   As discussed with Andrey and Till offline, we removed the changes that 
introduce default slot fraction, to avoid public api changes that will not 
actually be used in the upcoming 1.10 release.
   Travis passed: https://travis-ci.org/xintongsong/flink/builds/621435726


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   6   7   8   9   10   >