[jira] [Commented] (AIRFLOW-2658) Add GKE specific Kubernetes Pod Operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567410#comment-16567410 ] ASF subversion and git services commented on AIRFLOW-2658: -- Commit b4f43e6c48b976a873448a3cc8a1490efe38a8e8 in incubator-airflow's branch refs/heads/master from [~noremac201] [ https://gitbox.apache.org/repos/asf?p=incubator-airflow.git;h=b4f43e6 ] [AIRFLOW-2658] Add GCP specific k8s pod operator (#3532) Executes a task in a Kubernetes pod in the specified Google Kubernetes Engine cluster. This makes it easier to interact with GCP kubernetes engine service because it encapsulates acquiring credentials. > Add GKE specific Kubernetes Pod Operator > > > Key: AIRFLOW-2658 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2658 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Cameron Moberg >Assignee: Cameron Moberg >Priority: Minor > > Currently there is a Kubernetes Pod operator, but it is not really easy to > have it work with GCP Kubernetes Engine, it would be nice to have one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2658) Add GKE specific Kubernetes Pod Operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567409#comment-16567409 ] ASF GitHub Bot commented on AIRFLOW-2658: - kaxil closed pull request #3532: [AIRFLOW-2658] Add GCP specific k8s pod operator URL: https://github.com/apache/incubator-airflow/pull/3532 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/operators/gcp_container_operator.py b/airflow/contrib/operators/gcp_container_operator.py index 5648b4d8a0..615eac8a0f 100644 --- a/airflow/contrib/operators/gcp_container_operator.py +++ b/airflow/contrib/operators/gcp_container_operator.py @@ -17,8 +17,13 @@ # specific language governing permissions and limitations # under the License. # +import os +import subprocess +import tempfile + from airflow import AirflowException from airflow.contrib.hooks.gcp_container_hook import GKEClusterHook +from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults @@ -170,3 +175,147 @@ def execute(self, context): hook = GKEClusterHook(self.project_id, self.location) create_op = hook.create_cluster(cluster=self.body) return create_op + + +KUBE_CONFIG_ENV_VAR = "KUBECONFIG" +G_APP_CRED = "GOOGLE_APPLICATION_CREDENTIALS" + + +class GKEPodOperator(KubernetesPodOperator): +template_fields = ('project_id', 'location', + 'cluster_name') + KubernetesPodOperator.template_fields + +@apply_defaults +def __init__(self, + project_id, + location, + cluster_name, + gcp_conn_id='google_cloud_default', + *args, + **kwargs): +""" +Executes a task in a Kubernetes pod in the specified Google Kubernetes +Engine cluster + +This Operator assumes that the system has gcloud installed and either +has working default application credentials or has configured a +connection id with a service account. + +The **minimum** required to define a cluster to create are the variables +``task_id``, ``project_id``, ``location``, ``cluster_name``, ``name``, +``namespace``, and ``image`` + +**Operator Creation**: :: + +operator = GKEPodOperator(task_id='pod_op', + project_id='my-project', + location='us-central1-a', + cluster_name='my-cluster-name', + name='task-name', + namespace='default', + image='perl') + +.. seealso:: +For more detail about application authentication have a look at the reference: + https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application + +:param project_id: The Google Developers Console project id +:type project_id: str +:param location: The name of the Google Kubernetes Engine zone in which the +cluster resides, e.g. 'us-central1-a' +:type location: str +:param cluster_name: The name of the Google Kubernetes Engine cluster the pod +should be spawned in +:type cluster_name: str +:param gcp_conn_id: The google cloud connection id to use. This allows for +users to specify a service account. +:type gcp_conn_id: str +""" +super(GKEPodOperator, self).__init__(*args, **kwargs) +self.project_id = project_id +self.location = location +self.cluster_name = cluster_name +self.gcp_conn_id = gcp_conn_id + +def execute(self, context): +# Specifying a service account file allows the user to using non default +# authentication for creating a Kubernetes Pod. This is done by setting the +# environment variable `GOOGLE_APPLICATION_CREDENTIALS` that gcloud looks at. +key_file = None + +# If gcp_conn_id is not specified gcloud will use the default +# service account credentials. +if self.gcp_conn_id: +from airflow.hooks.base_hook import BaseHook +# extras is a deserialized json object +extras = BaseHook.get_connection(self.gcp_conn_id).extra_dejson +# key_file only gets set if a json file is created from a JSON string in +# the web ui, else none +key_file = self._set_env_from_extras(extras=extras) + +# Write config to a temp
[jira] [Commented] (AIRFLOW-2658) Add GKE specific Kubernetes Pod Operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16565575#comment-16565575 ] ASF GitHub Bot commented on AIRFLOW-2658: - Noremac201 commented on issue #3532: [AIRFLOW-2658] Add GCP specific k8s pod operator URL: https://github.com/apache/incubator-airflow/pull/3532#issuecomment-409633871 Looks like Travis isn't posting, here's my personal Travis build: https://travis-ci.org/Noremac201/incubator-airflow/builds/410543165 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add GKE specific Kubernetes Pod Operator > > > Key: AIRFLOW-2658 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2658 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Cameron Moberg >Assignee: Cameron Moberg >Priority: Minor > > Currently there is a Kubernetes Pod operator, but it is not really easy to > have it work with GCP Kubernetes Engine, it would be nice to have one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2658) Add GKE specific Kubernetes Pod Operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564402#comment-16564402 ] ASF GitHub Bot commented on AIRFLOW-2658: - fenglu-g commented on issue #3532: [AIRFLOW-2658] Add GCP specific k8s pod operator URL: https://github.com/apache/incubator-airflow/pull/3532#issuecomment-409378846 @Noremac201 please fix travis-ci, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add GKE specific Kubernetes Pod Operator > > > Key: AIRFLOW-2658 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2658 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Cameron Moberg >Assignee: Cameron Moberg >Priority: Minor > > Currently there is a Kubernetes Pod operator, but it is not really easy to > have it work with GCP Kubernetes Engine, it would be nice to have one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2658) Add GKE specific Kubernetes Pod Operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564085#comment-16564085 ] ASF GitHub Bot commented on AIRFLOW-2658: - fenglu-g commented on a change in pull request #3532: [AIRFLOW-2658] Add GCP specific k8s pod operator URL: https://github.com/apache/incubator-airflow/pull/3532#discussion_r206629560 ## File path: airflow/contrib/operators/gcp_container_operator.py ## @@ -170,3 +175,147 @@ def execute(self, context): hook = GKEClusterHook(self.project_id, self.location) create_op = hook.create_cluster(cluster=self.body) return create_op + + +KUBE_CONFIG_ENV_VAR = "KUBECONFIG" +G_APP_CRED = "GOOGLE_APPLICATION_CREDENTIALS" + + +class GKEPodOperator(KubernetesPodOperator): +template_fields = ('project_id', 'location', + 'cluster_name') + KubernetesPodOperator.template_fields + +@apply_defaults +def __init__(self, + project_id, + location, + cluster_name, + gcp_conn_id='google_cloud_default', + *args, + **kwargs): +""" +Executes a task in a Kubernetes pod in the specified Google Kubernetes +Engine cluster + +This Operator assumes that the system has gcloud installed and either +has working default application credentials or has configured a +connection id with a service account. + +The **minimum** required to define a cluster to create are the variables +``task_id``, ``project_id``, ``location``, ``cluster_name``, ``name``, +``namespace``, and ``image`` + +**Operator Creation**: :: + +operator = GKEPodOperator(task_id='pod_op', + project_id='my-project', + location='us-central1-a', + cluster_name='my-cluster-name', + name='task-name', + namespace='default', + image='perl') + +.. seealso:: +For more detail about application authentication have a look at the reference: + https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application + +:param project_id: The Google Developers Console project id +:type project_id: str +:param location: The name of the Google Kubernetes Engine zone in which the +cluster resides, e.g. 'us-central1-a' +:type location: str +:param cluster_name: The name of the Google Kubernetes Engine cluster the pod +should be spawned in +:type cluster_name: str +:param gcp_conn_id: The google cloud connection id to use. This allows for +users to specify a service account. +:type gcp_conn_id: str +""" +super(GKEPodOperator, self).__init__(*args, **kwargs) +self.project_id = project_id +self.location = location +self.cluster_name = cluster_name +self.gcp_conn_id = gcp_conn_id + +def execute(self, context): +# Specifying a service account file allows the user to using non default +# authentication for creating a Kubernetes Pod. This is done by setting the +# environment variable `GOOGLE_APPLICATION_CREDENTIALS` that gcloud looks at. +key_file = None + +# If gcp_conn_id is not specified gcloud will use the default +# service account credentials. +if self.gcp_conn_id: +from airflow.hooks.base_hook import BaseHook +# extras is a deserialized json object +extras = BaseHook.get_connection(self.gcp_conn_id).extra_dejson +# key_file only gets set if a json file is created from a JSON string in +# the web ui, else none +key_file = self._set_env_from_extras(extras=extras) + +# Write config to a temp file and set the environment variable to point to it. +# This is to avoid race conditions of reading/writing a single file +with tempfile.NamedTemporaryFile() as conf_file: +os.environ[KUBE_CONFIG_ENV_VAR] = conf_file.name +# Attempt to get/update credentials +# We call gcloud directly instead of using google-cloud-python api +# because there is no way to write kubernetes config to a file, which is +# required by KubernetesPodOperator. +# The gcloud command looks at the env variable `KUBECONFIG` for where to save +# the kubernetes config file. +subprocess.check_call( +["gcloud", "container", "clusters", "get-credentials", + self.cluster_name, + "--zone", self.location, +