This is an automated email from the ASF dual-hosted git repository.

wangyang pushed a commit to branch spark-operator
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 8be651e3f4dc066a8591d6f3d8dc009c6f87813f
Author: xiaohzho <xiaoh...@cisco.com>
AuthorDate: Wed Aug 23 14:26:47 2023 +0800

    [Spark][Operator] Add Spark operator CRD
---
 .../plugin/task/spark/crd/ApplicationState.java    |  36 ++
 .../task/spark/crd/ApplicationStateType.java       |  25 ++
 .../spark/crd/BatchSchedulerConfiguration.java     |  26 ++
 .../plugin/task/spark/crd/Dependencies.java        |  41 ++
 .../plugin/task/spark/crd/DriverInfo.java          |  87 ++++
 .../plugin/task/spark/crd/DriverSpec.java          | 224 ++++++++++
 .../plugin/task/spark/crd/DynamicAllocation.java   |  90 ++++
 .../plugin/task/spark/crd/ExecutorSpec.java        | 217 ++++++++++
 .../plugin/task/spark/crd/MonitoringSpec.java      |  87 ++++
 .../plugin/task/spark/crd/NamePath.java            |   7 +
 .../plugin/task/spark/crd/Port.java                |  11 +
 .../plugin/task/spark/crd/PrometheusSpec.java      |  73 ++++
 .../plugin/task/spark/crd/RestartPolicy.java       |  83 ++++
 .../plugin/task/spark/crd/SecretInfo.java          |   6 +
 .../plugin/task/spark/crd/SparkApplication.java    |  30 ++
 .../task/spark/crd/SparkApplicationList.java       |   7 +
 .../task/spark/crd/SparkApplicationSpec.java       | 481 +++++++++++++++++++++
 .../task/spark/crd/SparkApplicationStatus.java     | 151 +++++++
 .../task/spark/crd/SparkUIConfiguration.java       |  94 ++++
 19 files changed, 1776 insertions(+)

diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ApplicationState.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ApplicationState.java
new file mode 100644
index 0000000000..d13c0f0c06
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ApplicationState.java
@@ -0,0 +1,36 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ApplicationState {
+
+  ApplicationStateType state;
+
+  String errorMessage;
+
+
+  public ApplicationStateType getState() {
+    return state;
+  }
+
+  public void setState(ApplicationStateType state) {
+    this.state = state;
+  }
+
+  public String getErrorMessage() {
+    return errorMessage;
+  }
+
+  public void setErrorMessage(String errorMessage) {
+    this.errorMessage = errorMessage;
+  }
+
+  @Override
+  public String toString() {
+    return "ApplicationState{" +
+        "state=" + state +
+        ", errorMessage='" + errorMessage + '\'' +
+        '}';
+  }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ApplicationStateType.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ApplicationStateType.java
new file mode 100644
index 0000000000..c35a30856c
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ApplicationStateType.java
@@ -0,0 +1,25 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+public enum ApplicationStateType {
+  COMPLETED,
+
+  FAILED,
+
+  SUBMISSION_FAILED,
+
+  FAILING,
+
+  INVALIDATING,
+
+  PENDING_RERUN,
+
+  RUNNING,
+
+  SUBMITTED,
+
+  SUCCEEDING,
+
+  FINISHED,
+
+  UNKNOWN
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/BatchSchedulerConfiguration.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/BatchSchedulerConfiguration.java
new file mode 100644
index 0000000000..e688ffa07c
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/BatchSchedulerConfiguration.java
@@ -0,0 +1,26 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import io.fabric8.kubernetes.api.model.DefaultKubernetesResourceList;
+
+public class BatchSchedulerConfiguration {
+
+  /**
+   * (Optional) Queue stands for the resource queue which the application 
belongs to, it’s being
+   * used in Volcano batch scheduler.
+   */
+  String queue;
+
+  /**
+   * (Optional) PriorityClassName stands for the name of k8s PriorityClass 
resource, it’s being used
+   * in Volcano batch scheduler.
+   */
+  String priorityClassName;
+
+  /**
+   * (Optional) Resources stands for the resource list custom request for. 
Usually it is used to
+   * define the lower-bound limit. If specified, volcano scheduler will 
consider it as the resources
+   * requested.
+   */
+  DefaultKubernetesResourceList resources;
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/Dependencies.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/Dependencies.java
new file mode 100644
index 0000000000..607c931697
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/Dependencies.java
@@ -0,0 +1,41 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import java.util.List;
+
+public class Dependencies {
+
+  /**
+   * (Optional) Jars is a list of JAR files the Spark application depends on.
+   */
+  List<String> jars;
+
+  /**
+   * (Optional) Files is a list of files the Spark application depends on.
+   */
+  List<String> files;
+
+  /**
+   * (Optional) PyFiles is a list of Python files the Spark application 
depends on.
+   */
+  List<String> pyFiles;
+
+  /**
+   * (Optional) Packages is a list of maven coordinates of jars to include on 
the driver and
+   * executor classpaths. This will search the local maven repo, then maven 
central and any
+   * additional remote repositories given by the “repositories” option. Each 
package should be of
+   * the form “groupId:artifactId:version”.
+   */
+  List<String> packages;
+
+  /**
+   * (Optional) ExcludePackages is a list of “groupId:artifactId”, to exclude 
while resolving the
+   * dependencies provided in Packages to avoid dependency conflicts.
+   */
+  List<String> excludePackages;
+
+  /**
+   * (Optional) Repositories is a list of additional remote repositories to 
search for the maven
+   * coordinate given with the “packages” option.
+   */
+  List<String> repositories;
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DriverInfo.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DriverInfo.java
new file mode 100644
index 0000000000..c915b67679
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DriverInfo.java
@@ -0,0 +1,87 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DriverInfo {
+
+  String webUIServiceName;
+
+  /**
+   * UI Details for the UI created via ClusterIP service accessible from 
within the cluster.
+   */
+  int webUIPort;
+
+
+  String webUIAddress;
+
+  /**
+   * Ingress Details if an ingress for the UI was created.
+   */
+  String webUIIngressName;
+
+
+  String webUIIngressAddress;
+
+  String podName;
+
+  @Override
+  public String toString() {
+    return "DriverInfo{" +
+        "webUIServiceName='" + webUIServiceName + '\'' +
+        ", webUIPort=" + webUIPort +
+        ", webUIAddress='" + webUIAddress + '\'' +
+        ", webUIIngressName='" + webUIIngressName + '\'' +
+        ", webUIIngressAddress='" + webUIIngressAddress + '\'' +
+        ", podName='" + podName + '\'' +
+        '}';
+  }
+
+  public String getWebUIServiceName() {
+    return webUIServiceName;
+  }
+
+  public void setWebUIServiceName(String webUIServiceName) {
+    this.webUIServiceName = webUIServiceName;
+  }
+
+  public int getWebUIPort() {
+    return webUIPort;
+  }
+
+  public void setWebUIPort(int webUIPort) {
+    this.webUIPort = webUIPort;
+  }
+
+  public String getWebUIAddress() {
+    return webUIAddress;
+  }
+
+  public void setWebUIAddress(String webUIAddress) {
+    this.webUIAddress = webUIAddress;
+  }
+
+  public String getWebUIIngressName() {
+    return webUIIngressName;
+  }
+
+  public void setWebUIIngressName(String webUIIngressName) {
+    this.webUIIngressName = webUIIngressName;
+  }
+
+  public String getWebUIIngressAddress() {
+    return webUIIngressAddress;
+  }
+
+  public void setWebUIIngressAddress(String webUIIngressAddress) {
+    this.webUIIngressAddress = webUIIngressAddress;
+  }
+
+  public String getPodName() {
+    return podName;
+  }
+
+  public void setPodName(String podName) {
+    this.podName = podName;
+  }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DriverSpec.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DriverSpec.java
new file mode 100644
index 0000000000..8868bbb11d
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DriverSpec.java
@@ -0,0 +1,224 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.fabric8.kubernetes.api.model.Affinity;
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.EnvFromSource;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.HostAlias;
+import io.fabric8.kubernetes.api.model.Lifecycle;
+import io.fabric8.kubernetes.api.model.PodDNSConfig;
+import io.fabric8.kubernetes.api.model.Toleration;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Getter
+@Setter
+@ToString
+public class DriverSpec {
+
+
+  /**
+   * (Optional) PodName is the name of the driver pod that the user creates. 
This is used for the
+   * in-cluster client mode in which the user creates a client pod where the 
driver of the user
+   * application runs. It’s an error to set this field if Mode is not 
in-cluster-client.
+   */
+  String podName;
+
+  /**
+   * (Optional) CoreRequest is the physical CPU core request for the driver. 
Maps to
+   * spark.kubernetes.driver.request.cores that is available since Spark 3.0.
+   */
+  String coreRequest;
+
+  /**
+   * (Optional) JavaOptions is a String of extra JVM options to pass to the 
driver. For instance, GC
+   * settings or other logging.
+   */
+  String javaOptions;
+
+  /**
+   * (Optional) Lifecycle for running preStop or postStart commands
+   */
+  Lifecycle lifecycle;
+
+  /**
+   * (Optional) KubernetesMaster is the URL of the Kubernetes master used by 
the driver to manage
+   * executor pods and other Kubernetes resources. Default to 
https://kubernetes.default.svc.
+   */
+  String kubernetesMaster;
+
+  /**
+   * (Optional) ServiceAnnotations defines the annotations to be added to the 
Kubernetes headless
+   * service used by executors to connect to the driver.
+   */
+  Map<String, String> serviceAnnotations;
+
+  /**
+   * (Optional) Ports settings for the pods, following the Kubernetes 
specifications.
+   */
+  List<Port> ports;
+
+
+  /**
+   * (Optional) Cores maps to spark.driver.cores or spark.executor.cores for 
the driver and
+   * executors, respectively.
+   */
+  Integer cores;
+
+  /**
+   * CoreLimit specifies a hard limit on CPU cores for the pod. Optional
+   */
+  String coreLimit;
+
+  /**
+   * (Optional) Memory is the amount of memory to request for the pod.
+   */
+  String memory;
+
+
+  /**
+   * (Optional) MemoryOverhead is the amount of off-heap memory to allocate in 
cluster mode, in MiB
+   * unless otherwise specified.
+   */
+  String memoryOverhead;
+
+
+  /**
+   * (Optional) Image is the container image to use. Overrides Spec.Image if 
set.
+   */
+  String image;
+
+
+  /**
+   * (Optional) ConfigMaps carries information of other ConfigMaps to add to 
the pod.
+   */
+  List<NamePath> configMaps;
+
+
+  /**
+   * (Optional) Env carries the environment variables to add to the pod.
+   */
+  List<EnvVar> env;
+
+  /**
+   * (Optional) EnvVars carries the environment variables to add to the pod. 
Deprecated. Consider
+   * using env instead.
+   */
+  Map<String, String> envVars;
+
+  /**
+   * (Optional) EnvFrom is a list of sources to populate environment variables 
in the container.
+   */
+  List<EnvFromSource> envFrom;
+
+
+  /**
+   * (Optional) Labels are the Kubernetes labels to be added to the pod.
+   */
+  Map<String, String> labels;
+
+  /**
+   *
+   */
+  Map<String, String> annotations;
+
+  /**
+   * (Optional) VolumeMounts specifies the volumes listed in “.spec.volumes” 
to mount into the main
+   * container’s filesystem.
+   */
+  List<VolumeMount> volumeMounts;
+
+  /**
+   * (Optional) Affinity specifies the affinity/anti-affinity settings for the 
pod.
+   */
+  Affinity affinity;
+
+  /**
+   * (Optional) Tolerations specifies the tolerations listed in 
“.spec.tolerations” to be applied to
+   * the pod.
+   */
+  List<Toleration> tolerations;
+
+  /**
+   * (Optional) PodSecurityContext specifies the PodSecurityContext to apply.
+   */
+  io.fabric8.kubernetes.api.model.PodSecurityContext podSecurityContext;
+
+  /**
+   * (Optional) SecurityContext specifies the container’s SecurityContext to 
apply.
+   */
+  io.fabric8.kubernetes.api.model.SecurityContext securityContext;
+
+  /**
+   * (Optional) SchedulerName specifies the scheduler that will be used for 
scheduling
+   */
+  String schedulerName;
+
+  /**
+   * (Optional) Sidecars is a list of sidecar containers that run along side 
the main Spark
+   * container.
+   */
+  List<Container> sidecars;
+
+  /**
+   * (Optional) InitContainers is a list of init-containers that run to 
completion before the main
+   * Spark container.
+   */
+  List<Container> initContainers;
+
+  /**
+   * (Optional) HostNetwork indicates whether to request host networking for 
the pod or not.
+   */
+  boolean hostNetwork;
+
+  /**
+   * (Optional) NodeSelector is the Kubernetes node selector to be added to 
the driver and executor
+   * pods. This field is mutually exclusive with nodeSelector at 
SparkApplication level (which will
+   * be deprecated).
+   */
+  Map<String, String> nodeSelector;
+
+  /**
+   * (Optional) DnsConfig dns settings for the pod, following the Kubernetes 
specifications.
+   */
+  PodDNSConfig dnsConfig;
+
+  /**
+   * (Optional) Termination grace period seconds for the pod
+   */
+  long terminationGracePeriodSeconds;
+
+  /**
+   * (Optional) ServiceAccount is the name of the custom Kubernetes service 
account used by the
+   * pod.
+   */
+  String serviceAccount;
+
+  /**
+   * (Optional) HostAliases settings for the pod, following the Kubernetes 
specifications.
+   */
+  List<HostAlias> hostAliases;
+  /**
+   * (Optional) ShareProcessNamespace settings for the pod, following the 
Kubernetes
+   * specifications.
+   */
+  boolean shareProcessNamespace;
+
+  public DriverSpec() {
+  }
+
+  public void addEnv(EnvVar var) {
+    if (this.env == null) {
+      this.env = new ArrayList<>();
+    }
+    this.env.add(var);
+  }
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DynamicAllocation.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DynamicAllocation.java
new file mode 100644
index 0000000000..33677e870f
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/DynamicAllocation.java
@@ -0,0 +1,90 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DynamicAllocation {
+
+  /**
+   * bool Enabled controls whether dynamic allocation is enabled or not.
+   */
+  boolean enabled;
+
+  /**
+   * int32 (Optional) long InitialExecutors is the initial number of executors 
to request. If
+   * .spec.executor.instances is also set, the initial number of executors is 
set to the bigger of
+   * that and this option.
+   */
+  int initialExecutors;
+
+  /**
+   * int32 (Optional) MinExecutors is the lower bound for the number of 
executors if dynamic
+   * allocation is enabled.
+   */
+  int minExecutors;
+
+  /**
+   * int32 (Optional) MaxExecutors is the upper bound for the number of 
executors if dynamic
+   * allocation is enabled.
+   */
+  int maxExecutors;
+
+  /**
+   * int64 (Optional) ShuffleTrackingTimeout controls the timeout in 
milliseconds for executors that
+   * are holding shuffle data if shuffle tracking is enabled (true by default 
if dynamic allocation
+   * is enabled).
+   */
+  long shuffleTrackingTimeout;
+
+  public DynamicAllocation(int maxExecutors) {
+    this.maxExecutors = maxExecutors;
+    this.enabled = true;
+    this.initialExecutors = maxExecutors / 4;
+    this.minExecutors = 1;
+    this.shuffleTrackingTimeout = 300000;
+  }
+
+  public DynamicAllocation() {
+  }
+
+  public boolean isEnabled() {
+    return enabled;
+  }
+
+  public void setEnabled(boolean enabled) {
+    this.enabled = enabled;
+  }
+
+  public int getInitialExecutors() {
+    return initialExecutors;
+  }
+
+  public void setInitialExecutors(int initialExecutors) {
+    this.initialExecutors = initialExecutors;
+  }
+
+  public int getMinExecutors() {
+    return minExecutors;
+  }
+
+  public void setMinExecutors(int minExecutors) {
+    this.minExecutors = minExecutors;
+  }
+
+  public int getMaxExecutors() {
+    return maxExecutors;
+  }
+
+  public void setMaxExecutors(int maxExecutors) {
+    this.maxExecutors = maxExecutors;
+  }
+
+  public long getShuffleTrackingTimeout() {
+    return shuffleTrackingTimeout;
+  }
+
+  public void setShuffleTrackingTimeout(long shuffleTrackingTimeout) {
+    this.shuffleTrackingTimeout = shuffleTrackingTimeout;
+  }
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ExecutorSpec.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ExecutorSpec.java
new file mode 100644
index 0000000000..4f78441ca9
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/ExecutorSpec.java
@@ -0,0 +1,217 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.fabric8.kubernetes.api.model.Affinity;
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.EnvFromSource;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.HostAlias;
+import io.fabric8.kubernetes.api.model.PodDNSConfig;
+import io.fabric8.kubernetes.api.model.Toleration;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+@Getter
+@Setter
+@ToString
+public class ExecutorSpec {
+
+  /**
+   * (Optional) Instances is the number of executor instances.
+   */
+  int instances;
+
+  /**
+   * (Optional) CoreRequest is the physical CPU core request for the 
executors. Maps to
+   * spark.kubernetes.executor.request.cores that is available since Spark 2.4.
+   */
+  String coreRequest;
+
+  /**
+   * (Optional) JavaOptions is a String of extra JVM options to pass to the 
executors. For instance,
+   * GC settings or other logging.
+   */
+  String javaOptions;
+
+  /**
+   * (Optional) DeleteOnTermination specify whether executor pods should be 
deleted in case of
+   * failure or normal termination. Maps to 
spark.kubernetes.executor.deleteOnTermination that is
+   * available since Spark 3.0.
+   */
+  boolean deleteOnTermination;
+
+  /**
+   * (Optional) Ports settings for the pods, following the Kubernetes 
specifications.
+   */
+  List<Port> ports;
+
+
+  /**
+   * (Optional) Cores maps to spark.driver.cores or spark.executor.cores for 
the driver and
+   * executors, respectively.
+   */
+  Integer cores;
+
+  /**
+   * CoreLimit specifies a hard limit on CPU cores for the pod. Optional
+   */
+  String coreLimit;
+
+  /**
+   * (Optional) Memory is the amount of memory to request for the pod.
+   */
+  String memory;
+
+
+  /**
+   * (Optional) MemoryOverhead is the amount of off-heap memory to allocate in 
cluster mode, in MiB
+   * unless otherwise specified.
+   */
+  String memoryOverhead;
+
+  /**
+   * (Optional) This sets the Memory Overhead Factor that will allocate memory 
to non-JVM memory.
+   * For JVM-based jobs this value will default to 0.10, for non-JVM jobs 
0.40. Value of this field
+   * will be overridden by Spec.Driver.MemoryOverhead and 
Spec.Executor.MemoryOverhead if they are
+   * set.
+   */
+  String memoryOverheadFactor = "0.20";
+
+  /**
+   * (Optional) Image is the container image to use. Overrides Spec.Image if 
set.
+   */
+  String image;
+
+
+  /**
+   * (Optional) ConfigMaps carries information of other ConfigMaps to add to 
the pod.
+   */
+  List<NamePath> configMaps;
+
+
+  /**
+   * (Optional) Env carries the environment variables to add to the pod.
+   */
+  List<EnvVar> env;
+
+  /**
+   * (Optional) EnvVars carries the environment variables to add to the pod. 
Deprecated. Consider
+   * using env instead.
+   */
+  Map<String, String> envVars;
+
+  /**
+   * (Optional) EnvFrom is a list of sources to populate environment variables 
in the container.
+   */
+  List<EnvFromSource> envFrom;
+
+
+  /**
+   * (Optional) Labels are the Kubernetes labels to be added to the pod.
+   */
+  Map<String, String> labels;
+
+  /**
+   *
+   */
+  Map<String, String> annotations;
+
+  /**
+   * (Optional) VolumeMounts specifies the volumes listed in “.spec.volumes” 
to mount into the main
+   * container’s filesystem.
+   */
+  List<VolumeMount> volumeMounts;
+
+  /**
+   * (Optional) Affinity specifies the affinity/anti-affinity settings for the 
pod.
+   */
+  Affinity affinity;
+
+  /**
+   * (Optional) Tolerations specifies the tolerations listed in 
“.spec.tolerations” to be applied to
+   * the pod.
+   */
+  List<Toleration> tolerations;
+
+  /**
+   * (Optional) PodSecurityContext specifies the PodSecurityContext to apply.
+   */
+  io.fabric8.kubernetes.api.model.PodSecurityContext podSecurityContext;
+
+  /**
+   * (Optional) SecurityContext specifies the container’s SecurityContext to 
apply.
+   */
+  io.fabric8.kubernetes.api.model.SecurityContext securityContext;
+
+  /**
+   * (Optional) SchedulerName specifies the scheduler that will be used for 
scheduling
+   */
+  String schedulerName;
+
+  /**
+   * (Optional) Sidecars is a list of sidecar containers that run along side 
the main Spark
+   * container.
+   */
+  List<Container> sidecars;
+
+  /**
+   * (Optional) InitContainers is a list of init-containers that run to 
completion before the main
+   * Spark container.
+   */
+  List<Container> initContainers;
+
+  /**
+   * (Optional) HostNetwork indicates whether to request host networking for 
the pod or not.
+   */
+  boolean hostNetwork;
+
+  /**
+   * (Optional) NodeSelector is the Kubernetes node selector to be added to 
the driver and executor
+   * pods. This field is mutually exclusive with nodeSelector at 
SparkApplication level (which will
+   * be deprecated).
+   */
+  Map<String, String> nodeSelector;
+
+  /**
+   * (Optional) DnsConfig dns settings for the pod, following the Kubernetes 
specifications.
+   */
+  PodDNSConfig dnsConfig;
+
+  /**
+   * (Optional) Termination grace period seconds for the pod
+   */
+  long terminationGracePeriodSeconds;
+
+  /**
+   * (Optional) ServiceAccount is the name of the custom Kubernetes service 
account used by the
+   * pod.
+   */
+  String serviceAccount;
+
+  /**
+   * (Optional) HostAliases settings for the pod, following the Kubernetes 
specifications.
+   */
+  List<HostAlias> hostAliases;
+  /**
+   * (Optional) ShareProcessNamespace settings for the pod, following the 
Kubernetes
+   * specifications.
+   */
+  boolean shareProcessNamespace;
+
+  public ExecutorSpec() {
+  }
+
+  public void addEnv(EnvVar var) {
+    if (this.env == null) {
+      this.env = new ArrayList<>();
+    }
+    this.env.add(var);
+  }
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/MonitoringSpec.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/MonitoringSpec.java
new file mode 100644
index 0000000000..fffabdad34
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/MonitoringSpec.java
@@ -0,0 +1,87 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MonitoringSpec {
+
+  /**
+   * ExposeDriverMetrics specifies whether to expose metrics on the driver.
+   */
+  boolean exposeDriverMetrics;
+
+  /**
+   * ExposeExecutorMetrics specifies whether to expose metrics on the 
executors.
+   */
+  boolean exposeExecutorMetrics;
+
+  /**
+   * (Optional) MetricsProperties is the content of a custom 
metrics.properties for configuring the
+   * Spark metric system. If not specified, the content in 
spark-docker/conf/metrics.properties will
+   * be used.
+   */
+  String metricsProperties;
+
+  /**
+   * (Optional) MetricsPropertiesFile is the container local path of file 
metrics.properties for
+   * configuring the Spark metric system. If not specified, value
+   * /etc/metrics/conf/metrics.properties will be used.
+   */
+  String metricsPropertiesFile;
+
+  /**
+   * (Optional) Prometheus is for configuring the Prometheus JMX exporter.
+   */
+  PrometheusSpec prometheus;
+
+  public boolean isExposeDriverMetrics() {
+    return exposeDriverMetrics;
+  }
+
+  public void setExposeDriverMetrics(boolean exposeDriverMetrics) {
+    this.exposeDriverMetrics = exposeDriverMetrics;
+  }
+
+  public boolean isExposeExecutorMetrics() {
+    return exposeExecutorMetrics;
+  }
+
+  public void setExposeExecutorMetrics(boolean exposeExecutorMetrics) {
+    this.exposeExecutorMetrics = exposeExecutorMetrics;
+  }
+
+  public String getMetricsProperties() {
+    return metricsProperties;
+  }
+
+  public void setMetricsProperties(String metricsProperties) {
+    this.metricsProperties = metricsProperties;
+  }
+
+  public String getMetricsPropertiesFile() {
+    return metricsPropertiesFile;
+  }
+
+  public void setMetricsPropertiesFile(String metricsPropertiesFile) {
+    this.metricsPropertiesFile = metricsPropertiesFile;
+  }
+
+  public PrometheusSpec getPrometheus() {
+    return prometheus;
+  }
+
+  public void setPrometheus(PrometheusSpec prometheus) {
+    this.prometheus = prometheus;
+  }
+
+  @Override
+  public String toString() {
+    return "MonitoringSpec{" +
+        "exposeDriverMetrics=" + exposeDriverMetrics +
+        ", exposeExecutorMetrics=" + exposeExecutorMetrics +
+        ", metricsProperties='" + metricsProperties + '\'' +
+        ", metricsPropertiesFile='" + metricsPropertiesFile + '\'' +
+        ", prometheus=" + prometheus +
+        '}';
+  }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/NamePath.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/NamePath.java
new file mode 100644
index 0000000000..88a7feafb6
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/NamePath.java
@@ -0,0 +1,7 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+public class NamePath {
+
+  String name;
+  String path;
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/Port.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/Port.java
new file mode 100644
index 0000000000..79f8eb212f
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/Port.java
@@ -0,0 +1,11 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+public class Port {
+
+  String name;
+
+  String protocol;
+
+  int containerPort;
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/PrometheusSpec.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/PrometheusSpec.java
new file mode 100644
index 0000000000..3f69b51465
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/PrometheusSpec.java
@@ -0,0 +1,73 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PrometheusSpec {
+
+  /**
+   * JmxExporterJar is the path to the Prometheus JMX exporter jar in the 
container.
+   */
+  String jmxExporterJar;
+
+
+  /**
+   * (Optional) Port is the port of the HTTP server run by the Prometheus JMX 
exporter. If not
+   * specified, 8090 will be used as the default.
+   */
+  int port;
+
+  /**
+   * (Optional) PortName is the port name of prometheus JMX exporter port. If 
not specified,
+   * jmx-exporter will be used as the default.
+   */
+  String portName;
+
+  /**
+   * (Optional) ConfigFile is the path to the custom Prometheus configuration 
file provided in the
+   * Spark image. ConfigFile takes precedence over Configuration, which is 
shown below.
+   */
+  String configFile;
+
+  /**
+   * (Optional) Configuration is the content of the Prometheus configuration 
needed by the
+   * Prometheus JMX exporter. If not specified, the content in 
spark-docker/conf/prometheus.yaml
+   * will be used. Configuration has no effect if ConfigFile is set.
+   */
+  String configuration;
+
+  public int getPort() {
+    return port;
+  }
+
+  public void setPort(int port) {
+    this.port = port;
+  }
+
+  public String getJmxExporterJar() {
+    return jmxExporterJar;
+  }
+
+  public void setJmxExporterJar(String jmxExporterJar) {
+    this.jmxExporterJar = jmxExporterJar;
+  }
+
+  public String getConfigFile() {
+    return configFile;
+  }
+
+  public void setConfigFile(String configFile) {
+    this.configFile = configFile;
+  }
+
+  @Override
+  public String toString() {
+    return "PrometheusSpec{" +
+        "jmxExporterJar='" + jmxExporterJar + '\'' +
+        ", port=" + port +
+        ", portName='" + portName + '\'' +
+        ", configFile='" + configFile + '\'' +
+        ", configuration='" + configuration + '\'' +
+        '}';
+  }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/RestartPolicy.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/RestartPolicy.java
new file mode 100644
index 0000000000..6d5da14ada
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/RestartPolicy.java
@@ -0,0 +1,83 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RestartPolicy {
+
+  /**
+   * Type specifies the RestartPolicyType. can be: "Always" "Never" "OnFailure"
+   */
+  String type;
+
+  /**
+   * (Optional) OnSubmissionFailureRetries is the number of times to retry 
submitting an application
+   * before giving up. This is best effort and actual retry attempts can be >= 
the value specified
+   * due to caching. These are required if RestartPolicy is OnFailure.
+   */
+  Integer onSubmissionFailureRetries;
+
+  /**
+   * (Optional) OnFailureRetries the number of times to retry running an 
application before giving
+   * up.
+   */
+  Integer onFailureRetries;
+
+  /**
+   * (Optional) OnSubmissionFailureRetryInterval is the interval in seconds 
between retries on
+   * failed submissions.
+   */
+  Long onSubmissionFailureRetryInterval;
+
+  /**
+   * (Optional) OnFailureRetryInterval is the interval in seconds between 
retries on failed runs.
+   */
+  Long onFailureRetryInterval;
+
+  public RestartPolicy() {
+  }
+
+  public RestartPolicy(String type) {
+    this.type = type;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  public Integer getOnSubmissionFailureRetries() {
+    return onSubmissionFailureRetries;
+  }
+
+  public void setOnSubmissionFailureRetries(Integer 
onSubmissionFailureRetries) {
+    this.onSubmissionFailureRetries = onSubmissionFailureRetries;
+  }
+
+  public Integer getOnFailureRetries() {
+    return onFailureRetries;
+  }
+
+  public void setOnFailureRetries(Integer onFailureRetries) {
+    this.onFailureRetries = onFailureRetries;
+  }
+
+  public Long getOnSubmissionFailureRetryInterval() {
+    return onSubmissionFailureRetryInterval;
+  }
+
+  public void setOnSubmissionFailureRetryInterval(long 
onSubmissionFailureRetryInterval) {
+    this.onSubmissionFailureRetryInterval = onSubmissionFailureRetryInterval;
+  }
+
+  public Long getOnFailureRetryInterval() {
+    return onFailureRetryInterval;
+  }
+
+  public void setOnFailureRetryInterval(long onFailureRetryInterval) {
+    this.onFailureRetryInterval = onFailureRetryInterval;
+  }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SecretInfo.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SecretInfo.java
new file mode 100644
index 0000000000..6b7ae7affe
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SecretInfo.java
@@ -0,0 +1,6 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+public class SecretInfo {
+
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplication.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplication.java
new file mode 100644
index 0000000000..56939f8f91
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplication.java
@@ -0,0 +1,30 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import io.fabric8.kubernetes.api.model.Namespaced;
+import io.fabric8.kubernetes.client.CustomResource;
+import io.fabric8.kubernetes.model.annotation.Group;
+import io.fabric8.kubernetes.model.annotation.Kind;
+import io.fabric8.kubernetes.model.annotation.Version;
+
+@Version(SparkApplication.apiVersion)
+@Group(SparkApplication.group)
+@Kind(SparkApplication.kind)
+public class SparkApplication extends
+    CustomResource<SparkApplicationSpec, SparkApplicationStatus> implements
+    Namespaced {
+
+  public static final String apiVersion = "v1beta2";
+  public static final String group = "sparkoperator.k8s.io";
+  public static final String kind = "SparkApplication";
+
+  public SparkApplication() {
+  }
+
+  @Override
+  public String toString() {
+    return "SparkApplication{" +
+        "spec=" + spec +
+        '}';
+  }
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationList.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationList.java
new file mode 100644
index 0000000000..1c55d211b7
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationList.java
@@ -0,0 +1,7 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import io.fabric8.kubernetes.client.CustomResourceList;
+
+public class SparkApplicationList extends CustomResourceList<SparkApplication> 
{
+
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationSpec.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationSpec.java
new file mode 100644
index 0000000000..37cd3b35b8
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationSpec.java
@@ -0,0 +1,481 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.fabric8.kubernetes.api.model.Volume;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SparkApplicationSpec {
+
+  /**
+   * Type tells the type of the Spark application. can be
+   * <p>
+   * "Java" "Python" "R" "Scala"
+   */
+  String type;
+
+  /**
+   * SparkVersion is the version of Spark the application uses.
+   */
+  String sparkVersion;
+
+  /**
+   * Mode is the deployment mode of the Spark application. can be "client" 
"cluster"
+   * "in-cluster-client"
+   */
+  String mode;
+
+
+  /**
+   * String    (Optional) ProxyUser specifies the user to impersonate when 
submitting the application.
+   * It maps to the command-line flag “–proxy-user” in spark-submit.
+   */
+  String proxyUser;
+
+  /**
+   * (Optional) Image is the container image for the driver, executor, and 
init-container. Any
+   * custom container images for the driver, executor, or init-container takes 
precedence over
+   * this.
+   */
+  String image;
+
+  /**
+   * (Optional) ImagePullPolicy is the image pull policy for the driver, 
executor, and
+   * init-container.
+   */
+  String imagePullPolicy;
+
+  /**
+   * (Optional) ImagePullSecrets is the list of image-pull secrets.
+   */
+  List<String> imagePullSecrets;
+
+  /**
+   * (Optional) MainClass is the fully-qualified main class of the Spark 
application. This only
+   * applies to Java/Scala Spark applications.
+   */
+  String mainClass;
+
+  /**
+   * (Optional) MainFile is the path to a bundled JAR, Python, or R file of 
the application.
+   */
+  String mainApplicationFile;
+
+  /**
+   * (Optional) Arguments is a list of arguments to be passed to the 
application.
+   */
+  List<String> arguments;
+
+  /**
+   * (Optional) SparkConf carries user-specified Spark configuration 
properties as they would use
+   * the “–conf” option in spark-submit.
+   */
+  Map<String, String> sparkConf;
+
+  /**
+   * (Optional) HadoopConf carries user-specified Hadoop configuration 
properties as they would use
+   * the the “–conf” option in spark-submit. The SparkApplication controller 
automatically adds
+   * prefix “spark.hadoop.” to Hadoop configuration properties.
+   */
+  Map<String, String> hadoopConf;
+
+  /**
+   * (Optional) SparkConfigMap carries the name of the ConfigMap containing 
Spark configuration
+   * files such as log4j.properties. The controller will add environment 
variable SPARK_CONF_DIR to
+   * the path where the ConfigMap is mounted to.
+   */
+  String sparkConfigMap;
+
+  /**
+   * (Optional) HadoopConfigMap carries the name of the ConfigMap containing 
Hadoop configuration
+   * files such as core-site.xml. The controller will add environment variable 
HADOOP_CONF_DIR to
+   * the path where the ConfigMap is mounted to.
+   */
+  String hadoopConfigMap;
+
+  /**
+   * (Optional) Volumes is the list of Kubernetes volumes that can be mounted 
by the driver and/or
+   * executors.
+   */
+  List<Volume> volumes;
+
+  /**
+   * Driver is the driver specification.
+   */
+  DriverSpec driver;
+
+  /**
+   * Executor is the executor specification.
+   */
+  ExecutorSpec executor;
+
+  /**
+   * (Optional) Deps captures all possible types of dependencies of a Spark 
application.
+   */
+  Dependencies deps;
+
+  /**
+   * RestartPolicy defines the policy on if and in which conditions the 
controller should restart an
+   * application.
+   */
+  RestartPolicy restartPolicy;
+
+  /**
+   * (Optional) NodeSelector is the Kubernetes node selector to be added to 
the driver and executor
+   * pods. This field is mutually exclusive with nodeSelector at podSpec level 
(driver or executor).
+   * This field will be deprecated in future versions (at SparkApplicationSpec 
level).
+   */
+  Map<String, String> nodeSelector;
+
+  /**
+   * (Optional) FailureRetries is the number of times to retry a failed 
application before giving
+   * up. This is best effort and actual retry attempts can be >= the value 
specified.
+   */
+  Integer failureRetries;
+
+  /**
+   * (Optional) RetryInterval is the unit of intervals in seconds between 
submission retries.
+   */
+  long retryInterval;
+
+  /**
+   * (Optional) This sets the major Python version of the docker image used to 
run the driver and
+   * executor containers. Can either be 2 or 3, default 2.
+   */
+  String pythonVersion;
+
+  /**
+   * (Optional) This sets the Memory Overhead Factor that will allocate memory 
to non-JVM memory.
+   * For JVM-based jobs this value will default to 0.10, for non-JVM jobs 
0.40. Value of this field
+   * will be overridden by Spec.Driver.MemoryOverhead and 
Spec.Executor.MemoryOverhead if they are
+   * set.
+   */
+  String memoryOverheadFactor;
+
+  /**
+   * (Optional) Monitoring configures how monitoring is handled.
+   */
+  MonitoringSpec monitoring;
+
+  /**
+   * (Optional) BatchScheduler configures which batch scheduler will be used 
for scheduling
+   */
+  String batchScheduler;
+
+
+  /**
+   * (Optional) TimeToLiveSeconds defines the Time-To-Live (TTL) duration in 
seconds for this
+   * SparkApplication after its termination. The SparkApplication object will 
be garbage collected
+   * if the current time is more than the TimeToLiveSeconds since its 
termination.
+   */
+  long timeToLiveSeconds;
+
+  /**
+   * (Optional) BatchSchedulerOptions provides fine-grained control on how to 
batch scheduling.
+   */
+  BatchSchedulerConfiguration batchSchedulerOptions;
+
+  /**
+   * (Optional) SparkUIOptions allows configuring the Service and the Ingress 
to expose the sparkUI
+   */
+
+  SparkUIConfiguration sparkUIOptions;
+
+  /**
+   * (Optional) DynamicAllocation configures dynamic allocation that becomes 
available for the
+   * Kubernetes scheduler backend since Spark 3.0.
+   */
+  DynamicAllocation dynamicAllocation;
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  public String getSparkVersion() {
+    return sparkVersion;
+  }
+
+  public void setSparkVersion(String sparkVersion) {
+    this.sparkVersion = sparkVersion;
+  }
+
+  public String getMode() {
+    return mode;
+  }
+
+  public void setMode(String mode) {
+    this.mode = mode;
+  }
+
+  public String getProxyUser() {
+    return proxyUser;
+  }
+
+  public void setProxyUser(String proxyUser) {
+    this.proxyUser = proxyUser;
+  }
+
+  public String getImage() {
+    return image;
+  }
+
+  public void setImage(String image) {
+    this.image = image;
+  }
+
+  public String getImagePullPolicy() {
+    return imagePullPolicy;
+  }
+
+  public void setImagePullPolicy(String imagePullPolicy) {
+    this.imagePullPolicy = imagePullPolicy;
+  }
+
+  public List<String> getImagePullSecrets() {
+    return imagePullSecrets;
+  }
+
+  public void setImagePullSecrets(List<String> imagePullSecrets) {
+    this.imagePullSecrets = imagePullSecrets;
+  }
+
+  public String getMainClass() {
+    return mainClass;
+  }
+
+  public void setMainClass(String mainClass) {
+    this.mainClass = mainClass;
+  }
+
+  public String getMainApplicationFile() {
+    return mainApplicationFile;
+  }
+
+  public void setMainApplicationFile(String mainApplicationFile) {
+    this.mainApplicationFile = mainApplicationFile;
+  }
+
+  public List<String> getArguments() {
+    return arguments;
+  }
+
+  public void setArguments(List<String> arguments) {
+    this.arguments = arguments;
+  }
+
+  public Map<String, String> getSparkConf() {
+    return sparkConf;
+  }
+
+  public void addSparkConf(Map<String, String> sparkConf) {
+    if (this.sparkConf == null) {
+      this.sparkConf = sparkConf;
+    } else {
+      this.sparkConf.putAll(sparkConf);
+    }
+  }
+
+  public Map<String, String> getHadoopConf() {
+    return hadoopConf;
+  }
+
+  public void setHadoopConf(Map<String, String> hadoopConf) {
+    this.hadoopConf = hadoopConf;
+  }
+
+  public String getSparkConfigMap() {
+    return sparkConfigMap;
+  }
+
+  public void setSparkConfigMap(String sparkConfigMap) {
+    this.sparkConfigMap = sparkConfigMap;
+  }
+
+  public String getHadoopConfigMap() {
+    return hadoopConfigMap;
+  }
+
+  public void setHadoopConfigMap(String hadoopConfigMap) {
+    this.hadoopConfigMap = hadoopConfigMap;
+  }
+
+  public List<Volume> getVolumes() {
+    return volumes;
+  }
+
+  public void setVolumes(List<Volume> volumes) {
+    this.volumes = volumes;
+  }
+
+  public DriverSpec getDriver() {
+    return driver;
+  }
+
+  public void setDriver(DriverSpec driver) {
+    this.driver = driver;
+  }
+
+  public ExecutorSpec getExecutor() {
+    return executor;
+  }
+
+  public void setExecutor(ExecutorSpec executor) {
+    this.executor = executor;
+  }
+
+  public Dependencies getDeps() {
+    return deps;
+  }
+
+  public void setDeps(Dependencies deps) {
+    this.deps = deps;
+  }
+
+  public RestartPolicy getRestartPolicy() {
+    return restartPolicy;
+  }
+
+  public void setRestartPolicy(RestartPolicy restartPolicy) {
+    this.restartPolicy = restartPolicy;
+  }
+
+  public Map<String, String> getNodeSelector() {
+    return nodeSelector;
+  }
+
+  public void setNodeSelector(Map<String, String> nodeSelector) {
+    this.nodeSelector = nodeSelector;
+  }
+
+  public Integer getFailureRetries() {
+    return failureRetries;
+  }
+
+  public void setFailureRetries(Integer failureRetries) {
+    this.failureRetries = failureRetries;
+  }
+
+  public long getRetryInterval() {
+    return retryInterval;
+  }
+
+  public void setRetryInterval(long retryInterval) {
+    this.retryInterval = retryInterval;
+  }
+
+  public String getPythonVersion() {
+    return pythonVersion;
+  }
+
+  public void setPythonVersion(String pythonVersion) {
+    this.pythonVersion = pythonVersion;
+  }
+
+  public String getMemoryOverheadFactor() {
+    return memoryOverheadFactor;
+  }
+
+  public void setMemoryOverheadFactor(String memoryOverheadFactor) {
+    this.memoryOverheadFactor = memoryOverheadFactor;
+  }
+
+  public MonitoringSpec getMonitoring() {
+    return monitoring;
+  }
+
+  public void setMonitoring(MonitoringSpec monitoring) {
+    this.monitoring = monitoring;
+  }
+
+  public String getBatchScheduler() {
+    return batchScheduler;
+  }
+
+  public void setBatchScheduler(String batchScheduler) {
+    this.batchScheduler = batchScheduler;
+  }
+
+  public long getTimeToLiveSeconds() {
+    return timeToLiveSeconds;
+  }
+
+  public void setTimeToLiveSeconds(long timeToLiveSeconds) {
+    this.timeToLiveSeconds = timeToLiveSeconds;
+  }
+
+  public BatchSchedulerConfiguration getBatchSchedulerOptions() {
+    return batchSchedulerOptions;
+  }
+
+  public void setBatchSchedulerOptions(
+      BatchSchedulerConfiguration batchSchedulerOptions) {
+    this.batchSchedulerOptions = batchSchedulerOptions;
+  }
+
+  public SparkUIConfiguration getSparkUIOptions() {
+    return sparkUIOptions;
+  }
+
+  public void setSparkUIOptions(
+      SparkUIConfiguration sparkUIOptions) {
+    this.sparkUIOptions = sparkUIOptions;
+  }
+
+  public DynamicAllocation getDynamicAllocation() {
+    return dynamicAllocation;
+  }
+
+  public void setDynamicAllocation(
+      DynamicAllocation dynamicAllocation) {
+    this.dynamicAllocation = dynamicAllocation;
+  }
+
+  public void addConfToSparkConf(String name, String value) {
+    if (this.sparkConf == null) {
+      this.sparkConf = new HashMap<>();
+    }
+    sparkConf.put(name, value);
+  }
+
+  @Override
+  public String toString() {
+    return "SparkApplicationSpec{" +
+        "type='" + type + '\'' +
+        ", sparkVersion='" + sparkVersion + '\'' +
+        ", mode='" + mode + '\'' +
+        ", proxyUser='" + proxyUser + '\'' +
+        ", image='" + image + '\'' +
+        ", imagePullPolicy='" + imagePullPolicy + '\'' +
+        ", imagePullSecrets=" + imagePullSecrets +
+        ", mainClass='" + mainClass + '\'' +
+        ", mainApplicationFile='" + mainApplicationFile + '\'' +
+        ", arguments=" + arguments +
+        ", sparkConf=" + sparkConf +
+        ", hadoopConf=" + hadoopConf +
+        ", sparkConfigMap='" + sparkConfigMap + '\'' +
+        ", hadoopConfigMap='" + hadoopConfigMap + '\'' +
+        ", volumes=" + volumes +
+        ", driver=" + driver +
+        ", executor=" + executor +
+        ", deps=" + deps +
+        ", restartPolicy=" + restartPolicy +
+        ", nodeSelector=" + nodeSelector +
+        ", failureRetries=" + failureRetries +
+        ", retryInterval=" + retryInterval +
+        ", pythonVersion='" + pythonVersion + '\'' +
+        ", memoryOverheadFactor='" + memoryOverheadFactor + '\'' +
+        ", monitoring=" + monitoring +
+        ", batchScheduler='" + batchScheduler + '\'' +
+        ", timeToLiveSeconds=" + timeToLiveSeconds +
+        ", batchSchedulerOptions=" + batchSchedulerOptions +
+        ", sparkUIOptions=" + sparkUIOptions +
+        ", dynamicAllocation=" + dynamicAllocation +
+        '}';
+  }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationStatus.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationStatus.java
new file mode 100644
index 0000000000..cd755242ba
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkApplicationStatus.java
@@ -0,0 +1,151 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import io.fabric8.kubernetes.api.model.KubernetesResource;
+import java.util.Map;
+
+@JsonDeserialize()
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SparkApplicationStatus implements KubernetesResource {
+
+
+  /**
+   * SparkApplicationID is set by the spark-distribution(via spark.app.id 
config) on the driver and
+   * executor pods
+   */
+  String sparkApplicationId;
+
+
+  /**
+   * SubmissionID is a unique ID of the current submission of the application.
+   */
+  String submissionID;
+
+
+  /**
+   * LastSubmissionAttemptTime is the time for the last application submission 
attempt.
+   */
+  String lastSubmissionAttemptTime;
+
+  /**
+   * CompletionTime is the time when the application runs to completion if it 
does.
+   */
+  String terminationTime;
+
+
+  /**
+   * DriverInfo has information about the driver.
+   */
+  DriverInfo driverInfo;
+
+  /**
+   * AppState tells the overall application state.
+   */
+  ApplicationState applicationState;
+
+  /**
+   * records the state of executors by executor Pod names.
+   */
+  Map<String, String> executorState;
+
+  /**
+   * ExecutionAttempts is the total number of attempts to run a submitted 
application to completion.
+   * Incremented upon each attempted run of the application and reset upon 
invalidation.
+   */
+  int executionAttempts;
+
+  /**
+   * SubmissionAttempts is the total number of attempts to submit an 
application to run. Incremented
+   * upon each attempted submission of the application and reset upon 
invalidation and rerun.
+   */
+  int submissionAttempts;
+
+  @Override
+  public String toString() {
+    return "SparkApplicationStatus{" +
+        "sparkApplicationId='" + sparkApplicationId + '\'' +
+        ", submissionID='" + submissionID + '\'' +
+        ", lastSubmissionAttemptTime='" + lastSubmissionAttemptTime + '\'' +
+        ", terminationTime='" + terminationTime + '\'' +
+        ", driverInfo=" + driverInfo +
+        ", applicationState=" + applicationState +
+        ", executorState='" + executorState + '\'' +
+        ", executionAttempts=" + executionAttempts +
+        ", submissionAttempts=" + submissionAttempts +
+        '}';
+  }
+
+  public String getSparkApplicationId() {
+    return sparkApplicationId;
+  }
+
+  public void setSparkApplicationId(String sparkApplicationId) {
+    this.sparkApplicationId = sparkApplicationId;
+  }
+
+  public String getSubmissionID() {
+    return submissionID;
+  }
+
+  public void setSubmissionID(String submissionID) {
+    this.submissionID = submissionID;
+  }
+
+  public String getLastSubmissionAttemptTime() {
+    return lastSubmissionAttemptTime;
+  }
+
+  public void setLastSubmissionAttemptTime(String lastSubmissionAttemptTime) {
+    this.lastSubmissionAttemptTime = lastSubmissionAttemptTime;
+  }
+
+  public String getTerminationTime() {
+    return terminationTime;
+  }
+
+  public void setTerminationTime(String terminationTime) {
+    this.terminationTime = terminationTime;
+  }
+
+  public DriverInfo getDriverInfo() {
+    return driverInfo;
+  }
+
+  public void setDriverInfo(DriverInfo driverInfo) {
+    this.driverInfo = driverInfo;
+  }
+
+  public ApplicationState getApplicationState() {
+    return applicationState;
+  }
+
+  public void setApplicationState(
+      ApplicationState applicationState) {
+    this.applicationState = applicationState;
+  }
+
+  public Map<String, String> getExecutorState() {
+    return executorState;
+  }
+
+  public void setExecutorState(Map<String, String> executorState) {
+    this.executorState = executorState;
+  }
+
+  public int getExecutionAttempts() {
+    return executionAttempts;
+  }
+
+  public void setExecutionAttempts(int executionAttempts) {
+    this.executionAttempts = executionAttempts;
+  }
+
+  public int getSubmissionAttempts() {
+    return submissionAttempts;
+  }
+
+  public void setSubmissionAttempts(int submissionAttempts) {
+    this.submissionAttempts = submissionAttempts;
+  }
+}
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkUIConfiguration.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkUIConfiguration.java
new file mode 100644
index 0000000000..f299f7be83
--- /dev/null
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/crd/SparkUIConfiguration.java
@@ -0,0 +1,94 @@
+package org.apache.dolphinscheduler.plugin.task.spark.crd;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressTLS;
+import java.util.List;
+import java.util.Map;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SparkUIConfiguration {
+
+  /**
+   * (Optional) ServicePort allows configuring the port at service level that 
might be different
+   * from the targetPort. TargetPort should be the same as the one defined in 
spark.ui.port
+   */
+  Integer servicePort;
+
+  /**
+   * (Optional) ServicePortName allows configuring the name of the service 
port. This may be useful
+   * for sidecar proxies like Envoy injected by Istio which require specific 
ports names to treat
+   * traffic as proper HTTP. Defaults to spark-driver-ui-port.
+   */
+  String servicePortName;
+
+  /**
+   * (Optional) ServiceType allows configuring the type of the service. 
Defaults to ClusterIP.
+   */
+  String serviceType;
+
+  /**
+   * (Optional) ServiceAnnotations is a map of key,value pairs of annotations 
that might be added to
+   * the service object.
+   */
+  Map<String, String> serviceAnnotations;
+
+  /**
+   * (Optional) IngressAnnotations is a map of key,value pairs of annotations 
that might be added to
+   * the ingress object. i.e. specify nginx as ingress.class
+   */
+  Map<String, String> ingressAnnotations;
+
+  /**
+   * (Optional) TlsHosts is useful If we need to declare SSL certificates to 
the ingress object
+   */
+  List<IngressTLS> ingressTLS;
+
+  public Integer getServicePort() {
+    return servicePort;
+  }
+
+  public void setServicePort(Integer servicePort) {
+    this.servicePort = servicePort;
+  }
+
+  public String getServicePortName() {
+    return servicePortName;
+  }
+
+  public void setServicePortName(String servicePortName) {
+    this.servicePortName = servicePortName;
+  }
+
+  public String getServiceType() {
+    return serviceType;
+  }
+
+  public void setServiceType(String serviceType) {
+    this.serviceType = serviceType;
+  }
+
+  public Map<String, String> getServiceAnnotations() {
+    return serviceAnnotations;
+  }
+
+  public void setServiceAnnotations(Map<String, String> serviceAnnotations) {
+    this.serviceAnnotations = serviceAnnotations;
+  }
+
+  public Map<String, String> getIngressAnnotations() {
+    return ingressAnnotations;
+  }
+
+  public void setIngressAnnotations(Map<String, String> ingressAnnotations) {
+    this.ingressAnnotations = ingressAnnotations;
+  }
+
+  public List<IngressTLS> getIngressTLS() {
+    return ingressTLS;
+  }
+
+  public void setIngressTLS(
+      List<IngressTLS> ingressTLS) {
+    this.ingressTLS = ingressTLS;
+  }
+}

Reply via email to