This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executor-impl in repository https://gitbox.apache.org/repos/asf/flink.git
commit f1c36904bc52579ec50877717f8a98addc87c136 Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Tue Nov 5 13:31:56 2019 +0100 [FLINK-XXXXX] Add the ExecutorFactories for Session and Per-Job --- .../executors/JobClusterExecutorFactory.java | 57 ++++++++++++++++++++++ .../executors/SessionClusterExecutorFactory.java | 57 ++++++++++++++++++++++ ...org.apache.flink.core.execution.ExecutorFactory | 17 +++++++ 3 files changed, 131 insertions(+) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java new file mode 100644 index 0000000..5e10984 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutorFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.executors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.client.deployment.ClusterClientServiceLoader; +import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; +import org.apache.flink.configuration.ClusterMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.core.execution.ExecutorFactory; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An {@link ExecutorFactory} for executing jobs on dedicated (per-job) clusters. + */ +@Internal +public class JobClusterExecutorFactory implements ExecutorFactory { + + private final ClusterClientServiceLoader clusterClientServiceLoader; + + public JobClusterExecutorFactory() { + this(new DefaultClusterClientServiceLoader()); + } + + public JobClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) { + this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader); + } + + @Override + public boolean isCompatibleWith(Configuration configuration) { + return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.PER_JOB); + } + + @Override + public Executor getExecutor(Configuration configuration) { + return new JobClusterExecutor<>(clusterClientServiceLoader); + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java new file mode 100644 index 0000000..8e5a5eb --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutorFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment.executors; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.client.deployment.ClusterClientServiceLoader; +import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; +import org.apache.flink.configuration.ClusterMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.core.execution.Executor; +import org.apache.flink.core.execution.ExecutorFactory; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster. + */ +@Internal +public class SessionClusterExecutorFactory implements ExecutorFactory { + + private final ClusterClientServiceLoader clusterClientServiceLoader; + + public SessionClusterExecutorFactory() { + this(new DefaultClusterClientServiceLoader()); + } + + public SessionClusterExecutorFactory(final ClusterClientServiceLoader clusterClientServiceLoader) { + this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader); + } + + @Override + public boolean isCompatibleWith(Configuration configuration) { + return configuration.get(DeploymentOptions.CLUSTER_MODE).equals(ClusterMode.SESSION); + } + + @Override + public Executor getExecutor(Configuration configuration) { + return new SessionClusterExecutor<>(clusterClientServiceLoader); + } +} diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory new file mode 100644 index 0000000..870c57d --- /dev/null +++ b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.client.deployment.executors.JobClusterExecutorFactory +org.apache.flink.client.deployment.executors.SessionClusterExecutorFactory \ No newline at end of file