Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-09 Thread via GitHub


o-nikolas merged PR #38514:
URL: https://github.com/apache/airflow/pull/38514


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-08 Thread via GitHub


o-nikolas commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1556103085


##
airflow/jobs/job.py:
##
@@ -104,12 +106,13 @@ class Job(Base, LoggingMixin):
 Only makes sense for SchedulerJob and BackfillJob instances.
 """
 
-def __init__(self, executor=None, heartrate=None, **kwargs):
+def __init__(self, executor: BaseExecutor | None = None, heartrate=None, 
**kwargs):
 # Save init parameters as DB fields
 self.heartbeat_failed = False
 self.hostname = get_hostname()
 if executor:
 self.executor = executor

Review Comment:
   I'll cut a good-first-issue for this :+1:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-08 Thread via GitHub


uranusjr commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1555276527


##
airflow/jobs/job.py:
##
@@ -104,12 +106,13 @@ class Job(Base, LoggingMixin):
 Only makes sense for SchedulerJob and BackfillJob instances.
 """
 
-def __init__(self, executor=None, heartrate=None, **kwargs):
+def __init__(self, executor: BaseExecutor | None = None, heartrate=None, 
**kwargs):
 # Save init parameters as DB fields
 self.heartbeat_failed = False
 self.hostname = get_hostname()
 if executor:
 self.executor = executor

Review Comment:
   Reminder to remove this, either here directly or in a later PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-07 Thread via GitHub


potiuk commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1555067360


##
airflow/jobs/job.py:
##
@@ -104,12 +105,13 @@ class Job(Base, LoggingMixin):
 Only makes sense for SchedulerJob and BackfillJob instances.
 """
 
-def __init__(self, executor=None, heartrate=None, **kwargs):
+def __init__(self, executor: BaseExecutor | None = None, heartrate=None, 
**kwargs):
 # Save init parameters as DB fields
 self.heartbeat_failed = False
 self.hostname = get_hostname()
 if executor:
 self.executor = executor
+self.executors = [executor]

Review Comment:
   I also don't think eventually job should keep some extra fields that are not 
part of the data model. It's a hack really as Job is an ORM model. Both 
executor (executors) and heartrate kept in in job should be kept somewhere else 
if you ask me. Part of it was addressed when I split Job to have a separte Job 
Runner - and then likely we should eventually move both - heartbeat and 
executor(s) to the runners (but likely in a separate PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-05 Thread via GitHub


o-nikolas commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1554018702


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -819,19 +821,20 @@ def _execute(self) -> int | None:
 )
 
 try:
-self.job.executor.job_id = self.job.id
-if self.processor_agent:
-self.log.debug("Using PipeCallbackSink as callback sink.")
-self.job.executor.callback_sink = PipeCallbackSink(
-get_sink_pipe=self.processor_agent.get_callbacks_pipe
-)
-else:
-from airflow.callbacks.database_callback_sink import 
DatabaseCallbackSink
+for executor in self.job.executors:
+executor.job_id = self.job.id
+if self.processor_agent:
+self.log.debug("Using PipeCallbackSink as callback sink.")

Review Comment:
   Changes made and PR updated!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-04 Thread via GitHub


dstandish commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1552594898


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -270,8 +270,10 @@ def _debug_dump(self, signum: int, frame: FrameType | 
None) -> None:
 
 self.log.info("%s\n%s received, printing debug\n%s", "-" * 80, 
sig_name, "-" * 80)
 
-self.job.executor.debug_dump()
-self.log.info("-" * 80)
+for executor in self.job.executors:
+self.log.info("Debug dump for the executor %s", executor)
+executor.debug_dump()
+self.log.info("-" * 80)

Review Comment:
   Yup, just explaining



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-04 Thread via GitHub


o-nikolas commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1552549164


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -270,8 +270,10 @@ def _debug_dump(self, signum: int, frame: FrameType | 
None) -> None:
 
 self.log.info("%s\n%s received, printing debug\n%s", "-" * 80, 
sig_name, "-" * 80)
 
-self.job.executor.debug_dump()
-self.log.info("-" * 80)
+for executor in self.job.executors:
+self.log.info("Debug dump for the executor %s", executor)
+executor.debug_dump()
+self.log.info("-" * 80)

Review Comment:
   Yupp, again, I don't disagree! I just don't think this PR is the right place 
for that migration. I'd like these to really just be a porting of existing 
behaviour to multi executor, rather than intertwining them with new initiatives 
or other functional changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-03 Thread via GitHub


dstandish commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1550099456


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -270,8 +270,10 @@ def _debug_dump(self, signum: int, frame: FrameType | 
None) -> None:
 
 self.log.info("%s\n%s received, printing debug\n%s", "-" * 80, 
sig_name, "-" * 80)
 
-self.job.executor.debug_dump()
-self.log.info("-" * 80)
+for executor in self.job.executors:
+self.log.info("Debug dump for the executor %s", executor)
+executor.debug_dump()
+self.log.info("-" * 80)

Review Comment:
   Yeah to explain the motivation, I think is that commonly nowadays we are 
using logging tools where we search and filter messages to find what we are 
looking for, where in an earlier time we were perhaps more likely to just copy 
the log file locally and open it up and search through.  And so I think it 
becomes more important to ensure that each individual log message has the right 
context info attached to it and i think probably also you want to have more 
content on fewer lines -- whereas if you are just opening the file you want 
stuff pretty printed and not long lines. 
   
   Like, the base executor debug dump as an example, if you are searching 
through splunk or something you probably would prefer all of this in one long 
message instead of three, with appropriate tagging to identify it all as the 
log message for the single event of the debug dump :
   ```
   def debug_dump(self):
   """Get called in response to SIGUSR2 by the scheduler."""
   self.log.info(
   "executor.queued (%d)\n\t%s",
   len(self.queued_tasks),
   "\n\t".join(map(repr, self.queued_tasks.items())),
   )
   self.log.info("executor.running (%d)\n\t%s", len(self.running), 
"\n\t".join(map(repr, self.running)))
   self.log.info(
   "executor.event_buffer (%d)\n\t%s",
   len(self.event_buffer),
   "\n\t".join(map(repr, self.event_buffer.items())),
   )
   ```
   
   Then you would not need `self.log.info("Debug dump for the executor %s", 
executor)` either, and the `---` does not add anything in that context.  And I 
think we should probably move in that direction over time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-03 Thread via GitHub


dstandish commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1549952673


##
airflow/jobs/job.py:
##
@@ -104,12 +105,13 @@ class Job(Base, LoggingMixin):
 Only makes sense for SchedulerJob and BackfillJob instances.
 """
 
-def __init__(self, executor=None, heartrate=None, **kwargs):
+def __init__(self, executor: BaseExecutor | None = None, heartrate=None, 
**kwargs):
 # Save init parameters as DB fields
 self.heartbeat_failed = False
 self.hostname = get_hostname()
 if executor:
 self.executor = executor
+self.executors = [executor]

Review Comment:
   Yeah I don't think Job should be public, whether it is or not.  So updating 
it, and not having sorta duplicate attrs, feels like the right move.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-02 Thread via GitHub


uranusjr commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1548745509


##
airflow/jobs/job.py:
##
@@ -104,12 +105,13 @@ class Job(Base, LoggingMixin):
 Only makes sense for SchedulerJob and BackfillJob instances.
 """
 
-def __init__(self, executor=None, heartrate=None, **kwargs):
+def __init__(self, executor: BaseExecutor | None = None, heartrate=None, 
**kwargs):
 # Save init parameters as DB fields
 self.heartbeat_failed = False
 self.hostname = get_hostname()
 if executor:
 self.executor = executor
+self.executors = [executor]

Review Comment:
   I don’t think Job in the supported public API, so we can probably ignore 
compat issues. But let’s discuss this in a separate issue or PR. The test 
changes would be a bit noisy too and better done separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-02 Thread via GitHub


o-nikolas commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1548704690


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -819,19 +821,20 @@ def _execute(self) -> int | None:
 )
 
 try:
-self.job.executor.job_id = self.job.id
-if self.processor_agent:
-self.log.debug("Using PipeCallbackSink as callback sink.")
-self.job.executor.callback_sink = PipeCallbackSink(
-get_sink_pipe=self.processor_agent.get_callbacks_pipe
-)
-else:
-from airflow.callbacks.database_callback_sink import 
DatabaseCallbackSink
+for executor in self.job.executors:
+executor.job_id = self.job.id
+if self.processor_agent:
+self.log.debug("Using PipeCallbackSink as callback sink.")

Review Comment:
   Happy to share the sink between all executors, I'll make the change along 
with the above change :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-02 Thread via GitHub


o-nikolas commented on PR #38514:
URL: https://github.com/apache/airflow/pull/38514#issuecomment-2033219581

   > on executor end... i looked at the way executors have implemented end... i 
was surprised to see that they wait for all tasks to finish. given that we have 
re-adoption logic, i wonder if it actually makes sense to do that. why not just 
stop running and let the new scheduler (presumably there will be one?) pick up 
the tasks. perhaps it could hasten the process by unassigning i.e. 
disassociating from itself its tasks.
   
   The executor interface has both an `end` which waits synchronously and also 
`terminate` which ends more aggressively (closer to what you describe) when a 
SIGTERM is received. It's up to executors to be able to do both. I suppose we 
could move the scheduler to call terminate in more cases than when just a 
sigterm is received. But that needs more thinking and is certainly out of scope 
for this PR which is just to ensure that the current behaviour we have today 
works with multiple executors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-02 Thread via GitHub


o-nikolas commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1548690491


##
airflow/jobs/job.py:
##
@@ -104,12 +105,13 @@ class Job(Base, LoggingMixin):
 Only makes sense for SchedulerJob and BackfillJob instances.
 """
 
-def __init__(self, executor=None, heartrate=None, **kwargs):
+def __init__(self, executor: BaseExecutor | None = None, heartrate=None, 
**kwargs):
 # Save init parameters as DB fields
 self.heartbeat_failed = False
 self.hostname = get_hostname()
 if executor:
 self.executor = executor
+self.executors = [executor]

Review Comment:
   It is used in tests. I suppose we could update all the tests that use it and 
replace that with mocking? I'm also not sure if that would trigger something 
backcompat related by changing the signature of this class. It seemed a bit out 
of scope for this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-02 Thread via GitHub


o-nikolas commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1548693252


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -270,8 +270,10 @@ def _debug_dump(self, signum: int, frame: FrameType | 
None) -> None:
 
 self.log.info("%s\n%s received, printing debug\n%s", "-" * 80, 
sig_name, "-" * 80)
 
-self.job.executor.debug_dump()
-self.log.info("-" * 80)
+for executor in self.job.executors:
+self.log.info("Debug dump for the executor %s", executor)
+executor.debug_dump()
+self.log.info("-" * 80)

Review Comment:
   I don't necessarily disagree with you, but this was what was done 
previously. I try with these changes not to snowball the intent of the PR.
   
   Today people know to expect that `---...`header when they're looking through 
the logs for the executor debug dump. I don't think this PR should be 
opinionated about that and change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-02 Thread via GitHub


o-nikolas commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1548690491


##
airflow/jobs/job.py:
##
@@ -104,12 +105,13 @@ class Job(Base, LoggingMixin):
 Only makes sense for SchedulerJob and BackfillJob instances.
 """
 
-def __init__(self, executor=None, heartrate=None, **kwargs):
+def __init__(self, executor: BaseExecutor | None = None, heartrate=None, 
**kwargs):
 # Save init parameters as DB fields
 self.heartbeat_failed = False
 self.hostname = get_hostname()
 if executor:
 self.executor = executor
+self.executors = [executor]

Review Comment:
   It is used in tests. I suppose we could remove all the tests that use it and 
replace that with mocking? I'm also not sure if that would trigger something 
backcompat related by changing the signature of this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-02 Thread via GitHub


dstandish commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1548639831


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -819,19 +821,20 @@ def _execute(self) -> int | None:
 )
 
 try:
-self.job.executor.job_id = self.job.id
-if self.processor_agent:
-self.log.debug("Using PipeCallbackSink as callback sink.")
-self.job.executor.callback_sink = PipeCallbackSink(
-get_sink_pipe=self.processor_agent.get_callbacks_pipe
-)
-else:
-from airflow.callbacks.database_callback_sink import 
DatabaseCallbackSink
+for executor in self.job.executors:
+executor.job_id = self.job.id
+if self.processor_agent:
+self.log.debug("Using PipeCallbackSink as callback sink.")

Review Comment:
   any reason they should not use the same sink instance?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-02 Thread via GitHub


dstandish commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1548638986


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -270,8 +270,10 @@ def _debug_dump(self, signum: int, frame: FrameType | 
None) -> None:
 
 self.log.info("%s\n%s received, printing debug\n%s", "-" * 80, 
sig_name, "-" * 80)
 
-self.job.executor.debug_dump()
-self.log.info("-" * 80)
+for executor in self.job.executors:
+self.log.info("Debug dump for the executor %s", executor)
+executor.debug_dump()
+self.log.info("-" * 80)

Review Comment:
   e.g. alternatively we can ensure that each log line has sufficient context 
info so it can be understood which executor it comes from...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-02 Thread via GitHub


dstandish commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1548638498


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -270,8 +270,10 @@ def _debug_dump(self, signum: int, frame: FrameType | 
None) -> None:
 
 self.log.info("%s\n%s received, printing debug\n%s", "-" * 80, 
sig_name, "-" * 80)
 
-self.job.executor.debug_dump()
-self.log.info("-" * 80)
+for executor in self.job.executors:
+self.log.info("Debug dump for the executor %s", executor)
+executor.debug_dump()
+self.log.info("-" * 80)

Review Comment:
   ```suggestion
   ```
   i don't think it's a good practice to emit these kinds of `` log lines 
with no content. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-02 Thread via GitHub


dstandish commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1548637266


##
airflow/jobs/job.py:
##
@@ -104,12 +105,13 @@ class Job(Base, LoggingMixin):
 Only makes sense for SchedulerJob and BackfillJob instances.
 """
 
-def __init__(self, executor=None, heartrate=None, **kwargs):
+def __init__(self, executor: BaseExecutor | None = None, heartrate=None, 
**kwargs):
 # Save init parameters as DB fields
 self.heartbeat_failed = False
 self.hostname = get_hostname()
 if executor:
 self.executor = executor
+self.executors = [executor]

Review Comment:
   if `executor` is not used, perhaps we should just replace it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-02 Thread via GitHub


o-nikolas commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1548204315


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -819,19 +821,20 @@ def _execute(self) -> int | None:
 )
 
 try:
-self.job.executor.job_id = self.job.id
-if self.processor_agent:
-self.log.debug("Using PipeCallbackSink as callback sink.")
-self.job.executor.callback_sink = PipeCallbackSink(
-get_sink_pipe=self.processor_agent.get_callbacks_pipe
-)
-else:
-from airflow.callbacks.database_callback_sink import 
DatabaseCallbackSink
+for executor in self.job.executors:
+executor.job_id = self.job.id
+if self.processor_agent:
+self.log.debug("Using PipeCallbackSink as callback sink.")

Review Comment:
   > The if-else check should only be done once to begin with.
   
   Fair point, I can do that check just once before the loop
   
   > Since it is not possible for executors to have different callback sinks
   
   Do you mean to say that executors can't have different _types_ of callback 
sinks? Or that they should literally share the same in-memory sink object 
between all executors?
   
   @uranusjr 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-01 Thread via GitHub


uranusjr commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1547079296


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -819,19 +821,20 @@ def _execute(self) -> int | None:
 )
 
 try:
-self.job.executor.job_id = self.job.id
-if self.processor_agent:
-self.log.debug("Using PipeCallbackSink as callback sink.")
-self.job.executor.callback_sink = PipeCallbackSink(
-get_sink_pipe=self.processor_agent.get_callbacks_pipe
-)
-else:
-from airflow.callbacks.database_callback_sink import 
DatabaseCallbackSink
+for executor in self.job.executors:
+executor.job_id = self.job.id
+if self.processor_agent:
+self.log.debug("Using PipeCallbackSink as callback sink.")

Review Comment:
   Since it is not possible for executors to have different callback sinks, 
it’s probably better to only show this mesage once. The if-else check should 
only be done once to begin with.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-04-01 Thread via GitHub


o-nikolas commented on code in PR #38514:
URL: https://github.com/apache/airflow/pull/38514#discussion_r1540104792


##
airflow/jobs/job.py:
##
@@ -104,12 +105,13 @@ class Job(Base, LoggingMixin):
 Only makes sense for SchedulerJob and BackfillJob instances.
 """
 
-def __init__(self, executor=None, heartrate=None, **kwargs):
+def __init__(self, executor: BaseExecutor | None = None, heartrate=None, 
**kwargs):
 # Save init parameters as DB fields
 self.heartbeat_failed = False
 self.hostname = get_hostname()
 if executor:
 self.executor = executor
+self.executors = [executor]

Review Comment:
   In reality the `executor` parameter to `__init__` isn't used in production, 
so I didn't add an `executors` field to match it. To cover the tests that use 
it, I just take `executor` and update the `executors` field with that input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Implement start/end/debug for multiple executors in scheduler job [airflow]

2024-03-26 Thread via GitHub


o-nikolas opened a new pull request, #38514:
URL: https://github.com/apache/airflow/pull/38514

   This PR delivers the first set of changes to the scheduler job to transform 
it to support hybrid executors. It covers starting and ending the executors as 
well as debug dump.
   
   
   
   
   
   
   
   
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org