[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7400: [AIRFLOW-6759] Added MLEngine operator/hook to cancel MLEngine jobs

2020-02-12 Thread GitBox
nuclearpinguin commented on a change in pull request #7400: [AIRFLOW-6759] 
Added MLEngine operator/hook to cancel MLEngine jobs
URL: https://github.com/apache/airflow/pull/7400#discussion_r378238724
 
 

 ##
 File path: airflow/providers/google/cloud/operators/mlengine.py
 ##
 @@ -1015,3 +1015,58 @@ def check_existing_job(existing_job):
 if finished_training_job['state'] != 'SUCCEEDED':
 self.log.error('MLEngine training job failed: %s', 
str(finished_training_job))
 raise RuntimeError(finished_training_job['errorMessage'])
+
+
+class MLEngineTrainingJobFailureOperator(BaseOperator):
+
+"""
+Operator for cleaning up failed MLEngine training job.
+
+:param job_id: A unique templated id for the submitted Google MLEngine
+training job. (templated)
+:type job_id: str
+:param project_id: The Google Cloud project name within which MLEngine 
training job should run.
+If set to None or missing, the default project_id from the GCP 
connection is used. (templated)
+:type project_id: str
+:param gcp_conn_id: The connection ID to use when fetching connection info.
+:type gcp_conn_id: str
+:param delegate_to: The account to impersonate, if any.
+For this to work, the service account making the request must have
+domain-wide delegation enabled.
+:type delegate_to: str
+"""
+
+template_fields = [
+'_project_id',
+'_job_id',
+]
 
 Review comment:
   It varies. But in my opinion, this is immutable field :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7400: [AIRFLOW-6759] Added MLEngine operator/hook to cancel MLEngine jobs

2020-02-12 Thread GitBox
nuclearpinguin commented on a change in pull request #7400: [AIRFLOW-6759] 
Added MLEngine operator/hook to cancel MLEngine jobs
URL: https://github.com/apache/airflow/pull/7400#discussion_r378238926
 
 

 ##
 File path: airflow/providers/google/cloud/operators/mlengine.py
 ##
 @@ -1015,3 +1015,58 @@ def check_existing_job(existing_job):
 if finished_training_job['state'] != 'SUCCEEDED':
 self.log.error('MLEngine training job failed: %s', 
str(finished_training_job))
 raise RuntimeError(finished_training_job['errorMessage'])
+
+
+class MLEngineTrainingJobFailureOperator(BaseOperator):
+
+"""
+Operator for cleaning up failed MLEngine training job.
+
+:param job_id: A unique templated id for the submitted Google MLEngine
+training job. (templated)
+:type job_id: str
+:param project_id: The Google Cloud project name within which MLEngine 
training job should run.
+If set to None or missing, the default project_id from the GCP 
connection is used. (templated)
+:type project_id: str
+:param gcp_conn_id: The connection ID to use when fetching connection info.
+:type gcp_conn_id: str
+:param delegate_to: The account to impersonate, if any.
+For this to work, the service account making the request must have
+domain-wide delegation enabled.
+:type delegate_to: str
+"""
+
+template_fields = [
+'_project_id',
+'_job_id',
+]
+
+@apply_defaults
+def __init__(self,
+ job_id: str,
+ project_id: Optional[str] = None,
+ gcp_conn_id: str = 'google_cloud_default',
+ delegate_to: Optional[str] = None,
+ *args,
+ **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self._project_id = project_id
+self._job_id = job_id
+self._gcp_conn_id = gcp_conn_id
+self._delegate_to = delegate_to
+
+if not self._project_id:
+raise AirflowException('Google Cloud project id is required.')
+if not self._job_id:
+raise AirflowException(
 
 Review comment:
   MLEngineOperators are not best ones :D


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7400: [AIRFLOW-6759] Added MLEngine operator/hook to cancel MLEngine jobs

2020-02-11 Thread GitBox
nuclearpinguin commented on a change in pull request #7400: [AIRFLOW-6759] 
Added MLEngine operator/hook to cancel MLEngine jobs
URL: https://github.com/apache/airflow/pull/7400#discussion_r377745677
 
 

 ##
 File path: airflow/providers/google/cloud/operators/mlengine.py
 ##
 @@ -1015,3 +1015,58 @@ def check_existing_job(existing_job):
 if finished_training_job['state'] != 'SUCCEEDED':
 self.log.error('MLEngine training job failed: %s', 
str(finished_training_job))
 raise RuntimeError(finished_training_job['errorMessage'])
+
+
+class MLEngineTrainingJobFailureOperator(BaseOperator):
+
+"""
+Operator for cleaning up failed MLEngine training job.
+
+:param job_id: A unique templated id for the submitted Google MLEngine
+training job. (templated)
+:type job_id: str
+:param project_id: The Google Cloud project name within which MLEngine 
training job should run.
+If set to None or missing, the default project_id from the GCP 
connection is used. (templated)
+:type project_id: str
+:param gcp_conn_id: The connection ID to use when fetching connection info.
+:type gcp_conn_id: str
+:param delegate_to: The account to impersonate, if any.
+For this to work, the service account making the request must have
+domain-wide delegation enabled.
+:type delegate_to: str
+"""
+
+template_fields = [
+'_project_id',
+'_job_id',
+]
+
+@apply_defaults
+def __init__(self,
+ job_id: str,
+ project_id: Optional[str] = None,
+ gcp_conn_id: str = 'google_cloud_default',
+ delegate_to: Optional[str] = None,
+ *args,
+ **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self._project_id = project_id
+self._job_id = job_id
+self._gcp_conn_id = gcp_conn_id
+self._delegate_to = delegate_to
+
+if not self._project_id:
+raise AirflowException('Google Cloud project id is required.')
+if not self._job_id:
+raise AirflowException(
 
 Review comment:
   ```suggestion
   
   ```
   No need for that as `job_id` is required parameter. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7400: [AIRFLOW-6759] Added MLEngine operator/hook to cancel MLEngine jobs

2020-02-11 Thread GitBox
nuclearpinguin commented on a change in pull request #7400: [AIRFLOW-6759] 
Added MLEngine operator/hook to cancel MLEngine jobs
URL: https://github.com/apache/airflow/pull/7400#discussion_r377745225
 
 

 ##
 File path: airflow/providers/google/cloud/operators/mlengine.py
 ##
 @@ -1015,3 +1015,58 @@ def check_existing_job(existing_job):
 if finished_training_job['state'] != 'SUCCEEDED':
 self.log.error('MLEngine training job failed: %s', 
str(finished_training_job))
 raise RuntimeError(finished_training_job['errorMessage'])
+
+
+class MLEngineTrainingJobFailureOperator(BaseOperator):
+
+"""
+Operator for cleaning up failed MLEngine training job.
+
+:param job_id: A unique templated id for the submitted Google MLEngine
+training job. (templated)
+:type job_id: str
+:param project_id: The Google Cloud project name within which MLEngine 
training job should run.
+If set to None or missing, the default project_id from the GCP 
connection is used. (templated)
+:type project_id: str
+:param gcp_conn_id: The connection ID to use when fetching connection info.
+:type gcp_conn_id: str
+:param delegate_to: The account to impersonate, if any.
+For this to work, the service account making the request must have
+domain-wide delegation enabled.
+:type delegate_to: str
+"""
+
+template_fields = [
+'_project_id',
+'_job_id',
+]
 
 Review comment:
   Let use tuple here :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7400: [AIRFLOW-6759] Added MLEngine operator/hook to cancel MLEngine jobs

2020-02-11 Thread GitBox
nuclearpinguin commented on a change in pull request #7400: [AIRFLOW-6759] 
Added MLEngine operator/hook to cancel MLEngine jobs
URL: https://github.com/apache/airflow/pull/7400#discussion_r377744740
 
 

 ##
 File path: airflow/providers/google/cloud/hooks/mlengine.py
 ##
 @@ -148,6 +148,51 @@ def create_job(
 
 return self._wait_for_job_done(project_id, job_id)
 
+@CloudBaseHook.fallback_to_default_project_id
+def cancel_job(
+self,
+job_id,
+project_id: Optional[str] = None
+) -> Dict:
+
+"""
+Cancels a MLEngine job.
+
+:param project_id: The Google Cloud project id within which MLEngine
+job will be launched. If set to None or missing, the default 
project_id from the GCP
+connection is used.
+:type project_id: str
+:param job_id: A unique id for the want-to-be cancelled Google 
MLEngine training job.
+:type job_id: str
+
+:return: Empty dict if cancelled successfully
+:rtype: dict
+:raises: googleapiclient.errors.HttpError
+"""
+
+if not project_id:
+raise ValueError("The project_id should be set")
+
+hook = self.get_conn()
+
+request = hook.projects().jobs().cancel(  # pylint: disable=no-member
+name='projects/{}/jobs/{}'.format(project_id, job_id))
 
 Review comment:
   ```suggestion
   name=f'projects/{project_id}/jobs/{job_id}')
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7400: [AIRFLOW-6759] Added MLEngine operator/hook to cancel MLEngine jobs

2020-02-11 Thread GitBox
nuclearpinguin commented on a change in pull request #7400: [AIRFLOW-6759] 
Added MLEngine operator/hook to cancel MLEngine jobs
URL: https://github.com/apache/airflow/pull/7400#discussion_r377737240
 
 

 ##
 File path: airflow/providers/google/cloud/hooks/mlengine.py
 ##
 @@ -148,6 +148,51 @@ def create_job(
 
 return self._wait_for_job_done(project_id, job_id)
 
+@CloudBaseHook.fallback_to_default_project_id
+def cancel_job(
+self,
+job_id,
 
 Review comment:
   ```suggestion
   job_id: str,
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services