Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
henry3260 commented on PR #59392: URL: https://github.com/apache/airflow/pull/59392#issuecomment-3879095485 > LGTM! I'll merge if and when the CI is green. While the CI is running, please try to avoid making additional changes so I could merge it right after it (hopefully) ends succesfully. Thanks <3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
henry3260 commented on PR #59392:
URL: https://github.com/apache/airflow/pull/59392#issuecomment-3879077546
Hi! @wilsonhooi86 , Yes, it will find back the same previous_glue_job_id and
stop creating a new glue job, because when GlueJobOperator retries, it will
only find its own glue_job_run_id for the specific task_id.
> Good Day@henry3260 ,
>
> Happy New Year and thank you so much for taking the initiative to add this
feature. It will be helpful.
>
> I would like to clarify a specific scenario regarding a Glue job named
`glue_job_database_name_1`. This job is designed to handle a single schema but
uses a `tbl_name ` argument to process different tables dynamically. The script
logic adapts based on the table name passed during execution.
>
> Assuming 1 dag, there are 3 GlueJobOperator calling the same glue job name
`glue_job_database_name_1` running in parallel.
>
> Assuming `task_id="table_1"` and `task_id="table_2"` are still running
glue jobs. If `task_id="table_3"` suddenly failed due to some internal error
and retry again, will it be able to find back the same `previous_glue_job_id`
and stop creating a new glue job?
>
> ```
> table_1 = GlueJobOperator(
> task_id="table_1",
> job_name="glue_job_database_name_1",
> verbose=False,
> script_args={
> "--tbl_name": "table_1",
> },
>resume_glue_job_on_retry=True,
> retry_limit=3,
> )
>
> table_2 = GlueJobOperator(
> task_id="table_2",
> job_name="glue_job_database_name_1",
> verbose=False,
> script_args={
> "--tbl_name": "table_2",
> },
>resume_glue_job_on_retry=True,
> retry_limit=3,
> )
>
> table_3 = GlueJobOperator(
> task_id="table_3",
> job_name="glue_job_database_name_1",
> verbose=False,
> script_args={
> "--tbl_name": "table_3",
> },
>resume_glue_job_on_retry=True,
> retry_limit=3,
> )
> ```
>
> Thanks and let me know if you need further clarification
Hi! @wilsonhooi86 , Yes, it will find back the same previous_glue_job_id and
stop creating a new glue job, because when GlueJobOperator retries, it will
only find its own glue_job_run_id for the specific task_id.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
henry3260 commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2788760876
##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
:return: the current Glue job ID.
"""
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id",
task_ids=ti.task_id)
+if previous_job_run_id:
+try:
+job_run =
self.hook.conn.get_job_run(JobName=self.job_name, RunId=previous_job_run_id)
+state = job_run.get("JobRun", {}).get("JobRunState")
+self.log.info("Previous Glue job_run_id: %s, state:
%s", previous_job_run_id, state)
+if state in ("RUNNING", "STARTING", "STOPPING"):
+self._job_run_id = previous_job_run_id
+except Exception as e:
+self.log.warning("Failed to get previous Glue job run
state: %s", e)
+
+if not self._job_run_id:
+self.log.info(
+"Initializing AWS Glue Job: %s. Wait for completion: %s",
+self.job_name,
+self.wait_for_completion,
+)
+glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
+self._job_run_id = glue_job_run["JobRunId"]
+ti = context.get("ti")
+if ti:
+ti.xcom_push(key="glue_job_run_id", value=self._job_run_id)
Review Comment:
> Clarification - don't feel pressured at any stance, but if it's only a
matter of small fixes I'll be happy to include it in this release :)
yeah, I will push soon. Thanks for your kind
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
shahar1 commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2788659592
##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
:return: the current Glue job ID.
"""
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id",
task_ids=ti.task_id)
+if previous_job_run_id:
+try:
+job_run =
self.hook.conn.get_job_run(JobName=self.job_name, RunId=previous_job_run_id)
+state = job_run.get("JobRun", {}).get("JobRunState")
+self.log.info("Previous Glue job_run_id: %s, state:
%s", previous_job_run_id, state)
+if state in ("RUNNING", "STARTING", "STOPPING"):
+self._job_run_id = previous_job_run_id
+except Exception as e:
+self.log.warning("Failed to get previous Glue job run
state: %s", e)
+
+if not self._job_run_id:
+self.log.info(
+"Initializing AWS Glue Job: %s. Wait for completion: %s",
+self.job_name,
+self.wait_for_completion,
+)
+glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
+self._job_run_id = glue_job_run["JobRunId"]
+ti = context.get("ti")
+if ti:
+ti.xcom_push(key="glue_job_run_id", value=self._job_run_id)
Review Comment:
Clarification - don't feel pressured at any stance, but if it's only a
matter of small fixes I'll be happy to include it in this release :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
shahar1 commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2788653826
##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
:return: the current Glue job ID.
"""
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id",
task_ids=ti.task_id)
+if previous_job_run_id:
+try:
+job_run =
self.hook.conn.get_job_run(JobName=self.job_name, RunId=previous_job_run_id)
+state = job_run.get("JobRun", {}).get("JobRunState")
+self.log.info("Previous Glue job_run_id: %s, state:
%s", previous_job_run_id, state)
+if state in ("RUNNING", "STARTING", "STOPPING"):
+self._job_run_id = previous_job_run_id
+except Exception as e:
+self.log.warning("Failed to get previous Glue job run
state: %s", e)
+
+if not self._job_run_id:
+self.log.info(
+"Initializing AWS Glue Job: %s. Wait for completion: %s",
+self.job_name,
+self.wait_for_completion,
+)
+glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
+self._job_run_id = glue_job_run["JobRunId"]
+ti = context.get("ti")
+if ti:
+ti.xcom_push(key="glue_job_run_id", value=self._job_run_id)
Review Comment:
> > This seems overly defensive to me; `context["ti"]` should always be
available.
>
> Applied!
Thanks! Are you pushing soon the changes? (I'm cutting the release in an
hour or so)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
henry3260 commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2788630351
##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
:return: the current Glue job ID.
"""
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id",
task_ids=ti.task_id)
+if previous_job_run_id:
+try:
+job_run =
self.hook.conn.get_job_run(JobName=self.job_name, RunId=previous_job_run_id)
+state = job_run.get("JobRun", {}).get("JobRunState")
+self.log.info("Previous Glue job_run_id: %s, state:
%s", previous_job_run_id, state)
+if state in ("RUNNING", "STARTING", "STOPPING"):
+self._job_run_id = previous_job_run_id
+except Exception as e:
+self.log.warning("Failed to get previous Glue job run
state: %s", e)
+
+if not self._job_run_id:
+self.log.info(
+"Initializing AWS Glue Job: %s. Wait for completion: %s",
+self.job_name,
+self.wait_for_completion,
+)
+glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
+self._job_run_id = glue_job_run["JobRunId"]
+ti = context.get("ti")
+if ti:
+ti.xcom_push(key="glue_job_run_id", value=self._job_run_id)
Review Comment:
> This seems overly defensive to me; `context["ti"]` should always be
available.
Applied!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
henry3260 commented on PR #59392: URL: https://github.com/apache/airflow/pull/59392#issuecomment-3877553837 > @henry3260 Could you please address the open issues? Sorry for the late update. I'll address them shortly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
shahar1 commented on PR #59392: URL: https://github.com/apache/airflow/pull/59392#issuecomment-3877529874 @henry3260 Could you please address the open issues? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
uranusjr commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2674290887
##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
:return: the current Glue job ID.
"""
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id",
task_ids=ti.task_id)
+if previous_job_run_id:
+try:
+job_run =
self.hook.conn.get_job_run(JobName=self.job_name, RunId=previous_job_run_id)
+state = job_run.get("JobRun", {}).get("JobRunState")
+self.log.info("Previous Glue job_run_id: %s, state:
%s", previous_job_run_id, state)
+if state in ("RUNNING", "STARTING", "STOPPING"):
+self._job_run_id = previous_job_run_id
+except Exception as e:
+self.log.warning("Failed to get previous Glue job run
state: %s", e)
+
+if not self._job_run_id:
+self.log.info(
+"Initializing AWS Glue Job: %s. Wait for completion: %s",
+self.job_name,
+self.wait_for_completion,
+)
+glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
+self._job_run_id = glue_job_run["JobRunId"]
+ti = context.get("ti")
+if ti:
+ti.xcom_push(key="glue_job_run_id", value=self._job_run_id)
Review Comment:
This seems overly defensive to me; `context["ti"]` should always be
available.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
uranusjr commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2674289583
##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
:return: the current Glue job ID.
"""
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id",
task_ids=ti.task_id)
+if previous_job_run_id:
+try:
+job_run =
self.hook.conn.get_job_run(JobName=self.job_name, RunId=previous_job_run_id)
+state = job_run.get("JobRun", {}).get("JobRunState")
+self.log.info("Previous Glue job_run_id: %s, state:
%s", previous_job_run_id, state)
+if state in ("RUNNING", "STARTING", "STOPPING"):
+self._job_run_id = previous_job_run_id
+except Exception as e:
+self.log.warning("Failed to get previous Glue job run
state: %s", e)
Review Comment:
```suggestion
except Exception:
self.log.warning("Failed to get previous Glue job
run state", exc_info=True)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
wilsonhooi86 commented on PR #59392:
URL: https://github.com/apache/airflow/pull/59392#issuecomment-3723832283
Good Day@henry3260 ,
Happy New Year and thank you so much for taking the initiative to add this
feature. It will be helpful.
I would like to clarify a specific scenario regarding a Glue job named
`glue_job_database_name_1`. This job is designed to handle a single schema but
uses a `tbl_name ` argument to process different tables dynamically. The script
logic adapts based on the table name passed during execution.
Assuming 1 dag, there are 3 GlueJobOperator calling the same glue job name
`glue_job_database_name_1` running in parallel.
Assuming `task_id="table_1"` and `task_id="table_2"` are still running glue
jobs. If `task_id="table_3"` suddenly failed due to some internal error and
retry again, will it be able to find back the same previous_glue_job_id?
```
table_1 = GlueJobOperator(
task_id="table_1",
job_name="glue_job_database_name_1",
verbose=False,
script_args={
"--tbl_name": "table_1",
},
resume_glue_job_on_retry=True,
retry_limit=3,
)
table_2 = GlueJobOperator(
task_id="table_2",
job_name="glue_job_database_name_1",
verbose=False,
script_args={
"--tbl_name": "table_2",
},
resume_glue_job_on_retry=True,
retry_limit=3,
)
table_3 = GlueJobOperator(
task_id="table_3",
job_name="glue_job_database_name_1",
verbose=False,
script_args={
"--tbl_name": "table_3",
},
resume_glue_job_on_retry=True,
retry_limit=3,
)
```
Thanks and let me know if you need further clarification
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
o-nikolas commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2662944645
##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
:return: the current Glue job ID.
"""
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id",
task_ids=ti.task_id)
Review Comment:
Fair, I think I'll have to see the code to really say for sure. So feel free
to push it :slightly_smiling_face:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
henry3260 commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2662646108
##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
:return: the current Glue job ID.
"""
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id",
task_ids=ti.task_id)
Review Comment:
> I see you re-requested a review for this, but you haven't made the code
changes discussed above. Did you forget to push?
Sorry for the confusion. I haven't pushed the changes yet because I wanted
to confirm the approach with you first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
o-nikolas commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2662640098
##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
:return: the current Glue job ID.
"""
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id",
task_ids=ti.task_id)
Review Comment:
I see you re-requested a review for this, but you haven't made the code
changes discussed above. Did you forget to push?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
henry3260 commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2621678059
##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
:return: the current Glue job ID.
"""
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id",
task_ids=ti.task_id)
Review Comment:
You are absolutely right. I overlooked that Airflow clears XCom data upon
task retry.
Instead of using XCom, I plan to use the
[get_job_runs](https://docs.aws.amazon.com/glue/latest/webapi/API_GetJobRuns.html
) API to query AWS directly. This allows us to retrieve the JobRunId of any
active run (filtering for RUNNING or STARTING states) and resume monitoring it,
instead of starting a new duplicate job. What do you think? Thanks for
reviewing!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
o-nikolas commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2620770789
##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
:return: the current Glue job ID.
"""
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args,
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id",
task_ids=ti.task_id)
Review Comment:
So this is supposed to read the job id from a previous try of the same task
during the same dag run? I would have thought we reset xcom for the next try?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]
potiuk commented on PR #59392: URL: https://github.com/apache/airflow/pull/59392#issuecomment-3650635612 Looks good - but likely @vincbeck @o-nikolas @ferruzzi @ramitkataria -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
