Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2026-02-10 Thread via GitHub


henry3260 commented on PR #59392:
URL: https://github.com/apache/airflow/pull/59392#issuecomment-3879095485

   > LGTM! I'll merge if and when the CI is green. While the CI is running, 
please try to avoid making additional changes so I could merge it right after 
it (hopefully) ends succesfully.
   
   Thanks <3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2026-02-10 Thread via GitHub


henry3260 commented on PR #59392:
URL: https://github.com/apache/airflow/pull/59392#issuecomment-3879077546

   Hi! @wilsonhooi86 , Yes, it will find back the same previous_glue_job_id and 
stop creating a new glue job, because when GlueJobOperator retries, it will 
only find its own glue_job_run_id for the specific task_id.
   
   > Good Day@henry3260 ,
   > 
   > Happy New Year and thank you so much for taking the initiative to add this 
feature. It will be helpful.
   > 
   > I would like to clarify a specific scenario regarding a Glue job named 
`glue_job_database_name_1`. This job is designed to handle a single schema but 
uses a `tbl_name ` argument to process different tables dynamically. The script 
logic adapts based on the table name passed during execution.
   > 
   > Assuming 1 dag, there are 3 GlueJobOperator calling the same glue job name 
`glue_job_database_name_1` running in parallel.
   > 
   > Assuming `task_id="table_1"` and `task_id="table_2"` are still running 
glue jobs. If `task_id="table_3"` suddenly failed due to some internal error 
and retry again, will it be able to find back the same `previous_glue_job_id` 
and stop creating a new glue job?
   > 
   > ```
   > table_1 = GlueJobOperator(
   > task_id="table_1",
   > job_name="glue_job_database_name_1",
   > verbose=False,
   > script_args={
   > "--tbl_name": "table_1",
   > },
   >resume_glue_job_on_retry=True,
   > retry_limit=3,
   > )
   >
   > table_2 = GlueJobOperator(
   > task_id="table_2",
   > job_name="glue_job_database_name_1",
   > verbose=False,
   > script_args={
   > "--tbl_name": "table_2",
   > },
   >resume_glue_job_on_retry=True,
   > retry_limit=3,
   > )
   >
   > table_3 = GlueJobOperator(
   > task_id="table_3",
   > job_name="glue_job_database_name_1",
   > verbose=False,
   > script_args={
   > "--tbl_name": "table_3",
   > },
   >resume_glue_job_on_retry=True,
   > retry_limit=3,
   > )
   > ```
   > 
   > Thanks and let me know if you need further clarification
   
   Hi! @wilsonhooi86 , Yes, it will find back the same previous_glue_job_id and 
stop creating a new glue job, because when GlueJobOperator retries, it will 
only find its own glue_job_run_id for the specific task_id.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2026-02-10 Thread via GitHub


henry3260 commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2788760876


##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
 
 :return: the current Glue job ID.
 """
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", 
task_ids=ti.task_id)
+if previous_job_run_id:
+try:
+job_run = 
self.hook.conn.get_job_run(JobName=self.job_name, RunId=previous_job_run_id)
+state = job_run.get("JobRun", {}).get("JobRunState")
+self.log.info("Previous Glue job_run_id: %s, state: 
%s", previous_job_run_id, state)
+if state in ("RUNNING", "STARTING", "STOPPING"):
+self._job_run_id = previous_job_run_id
+except Exception as e:
+self.log.warning("Failed to get previous Glue job run 
state: %s", e)
+
+if not self._job_run_id:
+self.log.info(
+"Initializing AWS Glue Job: %s. Wait for completion: %s",
+self.job_name,
+self.wait_for_completion,
+)
+glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
+self._job_run_id = glue_job_run["JobRunId"]
+ti = context.get("ti")
+if ti:
+ti.xcom_push(key="glue_job_run_id", value=self._job_run_id)

Review Comment:
   > Clarification - don't feel pressured at any stance, but if it's only a 
matter of small fixes I'll be happy to include it in this release :)
   
   yeah, I will push soon. Thanks for your kind



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2026-02-10 Thread via GitHub


shahar1 commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2788659592


##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
 
 :return: the current Glue job ID.
 """
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", 
task_ids=ti.task_id)
+if previous_job_run_id:
+try:
+job_run = 
self.hook.conn.get_job_run(JobName=self.job_name, RunId=previous_job_run_id)
+state = job_run.get("JobRun", {}).get("JobRunState")
+self.log.info("Previous Glue job_run_id: %s, state: 
%s", previous_job_run_id, state)
+if state in ("RUNNING", "STARTING", "STOPPING"):
+self._job_run_id = previous_job_run_id
+except Exception as e:
+self.log.warning("Failed to get previous Glue job run 
state: %s", e)
+
+if not self._job_run_id:
+self.log.info(
+"Initializing AWS Glue Job: %s. Wait for completion: %s",
+self.job_name,
+self.wait_for_completion,
+)
+glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
+self._job_run_id = glue_job_run["JobRunId"]
+ti = context.get("ti")
+if ti:
+ti.xcom_push(key="glue_job_run_id", value=self._job_run_id)

Review Comment:
   Clarification - don't feel pressured at any stance, but if it's only a 
matter of small fixes I'll be happy to include it in this release :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2026-02-10 Thread via GitHub


shahar1 commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2788653826


##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
 
 :return: the current Glue job ID.
 """
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", 
task_ids=ti.task_id)
+if previous_job_run_id:
+try:
+job_run = 
self.hook.conn.get_job_run(JobName=self.job_name, RunId=previous_job_run_id)
+state = job_run.get("JobRun", {}).get("JobRunState")
+self.log.info("Previous Glue job_run_id: %s, state: 
%s", previous_job_run_id, state)
+if state in ("RUNNING", "STARTING", "STOPPING"):
+self._job_run_id = previous_job_run_id
+except Exception as e:
+self.log.warning("Failed to get previous Glue job run 
state: %s", e)
+
+if not self._job_run_id:
+self.log.info(
+"Initializing AWS Glue Job: %s. Wait for completion: %s",
+self.job_name,
+self.wait_for_completion,
+)
+glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
+self._job_run_id = glue_job_run["JobRunId"]
+ti = context.get("ti")
+if ti:
+ti.xcom_push(key="glue_job_run_id", value=self._job_run_id)

Review Comment:
   > > This seems overly defensive to me; `context["ti"]` should always be 
available.
   > 
   > Applied!
   
   Thanks! Are you pushing soon the changes? (I'm cutting the release in an 
hour or so)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2026-02-10 Thread via GitHub


henry3260 commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2788630351


##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
 
 :return: the current Glue job ID.
 """
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", 
task_ids=ti.task_id)
+if previous_job_run_id:
+try:
+job_run = 
self.hook.conn.get_job_run(JobName=self.job_name, RunId=previous_job_run_id)
+state = job_run.get("JobRun", {}).get("JobRunState")
+self.log.info("Previous Glue job_run_id: %s, state: 
%s", previous_job_run_id, state)
+if state in ("RUNNING", "STARTING", "STOPPING"):
+self._job_run_id = previous_job_run_id
+except Exception as e:
+self.log.warning("Failed to get previous Glue job run 
state: %s", e)
+
+if not self._job_run_id:
+self.log.info(
+"Initializing AWS Glue Job: %s. Wait for completion: %s",
+self.job_name,
+self.wait_for_completion,
+)
+glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
+self._job_run_id = glue_job_run["JobRunId"]
+ti = context.get("ti")
+if ti:
+ti.xcom_push(key="glue_job_run_id", value=self._job_run_id)

Review Comment:
   > This seems overly defensive to me; `context["ti"]` should always be 
available.
   
   Applied!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2026-02-10 Thread via GitHub


henry3260 commented on PR #59392:
URL: https://github.com/apache/airflow/pull/59392#issuecomment-3877553837

   > @henry3260 Could you please address the open issues?
   
   Sorry for the late update. I'll address them shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2026-02-10 Thread via GitHub


shahar1 commented on PR #59392:
URL: https://github.com/apache/airflow/pull/59392#issuecomment-3877529874

   @henry3260 Could you please address the open issues?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2026-01-08 Thread via GitHub


uranusjr commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2674290887


##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
 
 :return: the current Glue job ID.
 """
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", 
task_ids=ti.task_id)
+if previous_job_run_id:
+try:
+job_run = 
self.hook.conn.get_job_run(JobName=self.job_name, RunId=previous_job_run_id)
+state = job_run.get("JobRun", {}).get("JobRunState")
+self.log.info("Previous Glue job_run_id: %s, state: 
%s", previous_job_run_id, state)
+if state in ("RUNNING", "STARTING", "STOPPING"):
+self._job_run_id = previous_job_run_id
+except Exception as e:
+self.log.warning("Failed to get previous Glue job run 
state: %s", e)
+
+if not self._job_run_id:
+self.log.info(
+"Initializing AWS Glue Job: %s. Wait for completion: %s",
+self.job_name,
+self.wait_for_completion,
+)
+glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
+self._job_run_id = glue_job_run["JobRunId"]
+ti = context.get("ti")
+if ti:
+ti.xcom_push(key="glue_job_run_id", value=self._job_run_id)

Review Comment:
   This seems overly defensive to me; `context["ti"]` should always be 
available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2026-01-08 Thread via GitHub


uranusjr commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2674289583


##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
 
 :return: the current Glue job ID.
 """
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", 
task_ids=ti.task_id)
+if previous_job_run_id:
+try:
+job_run = 
self.hook.conn.get_job_run(JobName=self.job_name, RunId=previous_job_run_id)
+state = job_run.get("JobRun", {}).get("JobRunState")
+self.log.info("Previous Glue job_run_id: %s, state: 
%s", previous_job_run_id, state)
+if state in ("RUNNING", "STARTING", "STOPPING"):
+self._job_run_id = previous_job_run_id
+except Exception as e:
+self.log.warning("Failed to get previous Glue job run 
state: %s", e)

Review Comment:
   ```suggestion
   except Exception:
   self.log.warning("Failed to get previous Glue job 
run state", exc_info=True)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2026-01-08 Thread via GitHub


wilsonhooi86 commented on PR #59392:
URL: https://github.com/apache/airflow/pull/59392#issuecomment-3723832283

   Good Day@henry3260 , 
   
   Happy New Year and thank you so much for taking the initiative to add this 
feature. It will be helpful.
   
   I would like to clarify a specific scenario regarding a Glue job named 
`glue_job_database_name_1`. This job is designed to handle a single schema but 
uses a `tbl_name ` argument to process different tables dynamically. The script 
logic adapts based on the table name passed during execution.
   
   Assuming 1 dag, there are 3 GlueJobOperator calling the same glue job name 
`glue_job_database_name_1` running in parallel.
   
   Assuming `task_id="table_1"` and `task_id="table_2"` are still running glue 
jobs. If `task_id="table_3"` suddenly failed due to some internal error and 
retry again, will it be able to find back the same previous_glue_job_id?
   
   ```
   table_1 = GlueJobOperator(
   task_id="table_1",
   job_name="glue_job_database_name_1",
   verbose=False,
   script_args={
   "--tbl_name": "table_1",
   },
resume_glue_job_on_retry=True,
   retry_limit=3,
   )

   table_2 = GlueJobOperator(
   task_id="table_2",
   job_name="glue_job_database_name_1",
   verbose=False,
   script_args={
   "--tbl_name": "table_2",
   },
resume_glue_job_on_retry=True,
   retry_limit=3,
   )

   table_3 = GlueJobOperator(
   task_id="table_3",
   job_name="glue_job_database_name_1",
   verbose=False,
   script_args={
   "--tbl_name": "table_3",
   },
resume_glue_job_on_retry=True,
   retry_limit=3,
   )
   ```
   
   Thanks and let me know if you need further clarification


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2026-01-05 Thread via GitHub


o-nikolas commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2662944645


##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
 
 :return: the current Glue job ID.
 """
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", 
task_ids=ti.task_id)

Review Comment:
   Fair, I think I'll have to see the code to really say for sure. So feel free 
to push it :slightly_smiling_face: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2026-01-05 Thread via GitHub


henry3260 commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2662646108


##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
 
 :return: the current Glue job ID.
 """
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", 
task_ids=ti.task_id)

Review Comment:
   > I see you re-requested a review for this, but you haven't made the code 
changes discussed above. Did you forget to push?
   
   Sorry for the confusion. I haven't pushed the changes yet because I wanted 
to confirm the approach with you first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2026-01-05 Thread via GitHub


o-nikolas commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2662640098


##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
 
 :return: the current Glue job ID.
 """
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", 
task_ids=ti.task_id)

Review Comment:
   I see you re-requested a review for this, but you haven't made the code 
changes discussed above. Did you forget to push?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2025-12-15 Thread via GitHub


henry3260 commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2621678059


##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
 
 :return: the current Glue job ID.
 """
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", 
task_ids=ti.task_id)

Review Comment:
   You are absolutely right. I overlooked that Airflow clears XCom data upon 
task retry.
   
   Instead of using XCom, I plan to use the 
[get_job_runs](https://docs.aws.amazon.com/glue/latest/webapi/API_GetJobRuns.html
 ) API to query AWS directly. This allows us to retrieve the JobRunId of any 
active run (filtering for RUNNING or STARTING states) and resume monitoring it, 
instead of starting a new duplicate job. What do you think? Thanks for 
reviewing!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2025-12-15 Thread via GitHub


o-nikolas commented on code in PR #59392:
URL: https://github.com/apache/airflow/pull/59392#discussion_r2620770789


##
providers/amazon/src/airflow/providers/amazon/aws/operators/glue.py:
##
@@ -217,13 +219,33 @@ def execute(self, context: Context):
 
 :return: the current Glue job ID.
 """
-self.log.info(
-"Initializing AWS Glue Job: %s. Wait for completion: %s",
-self.job_name,
-self.wait_for_completion,
-)
-glue_job_run = self.hook.initialize_job(self.script_args, 
self.run_job_kwargs)
-self._job_run_id = glue_job_run["JobRunId"]
+previous_job_run_id = None
+if self.resume_glue_job_on_retry:
+ti = context.get("ti")
+if ti:
+previous_job_run_id = ti.xcom_pull(key="glue_job_run_id", 
task_ids=ti.task_id)

Review Comment:
   So this is supposed to read the job id from a previous try of the same task 
during the same dag run? I would have thought we reset xcom for the next try?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: Add resume_glue_job_on_retry to GlueJobOperator [airflow]

2025-12-14 Thread via GitHub


potiuk commented on PR #59392:
URL: https://github.com/apache/airflow/pull/59392#issuecomment-3650635612

   Looks good - but likely @vincbeck @o-nikolas @ferruzzi @ramitkataria 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]