Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
jscheffl commented on PR #59218: URL: https://github.com/apache/airflow/pull/59218#issuecomment-3712178871 #protm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
vincbeck merged PR #59218: URL: https://github.com/apache/airflow/pull/59218 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
jscheffl commented on PR #59218: URL: https://github.com/apache/airflow/pull/59218#issuecomment-3710609105 For safety made a final rebase, let's have CI green and then merge ASAP! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Nataneljpwd commented on PR #59218: URL: https://github.com/apache/airflow/pull/59218#issuecomment-3707787551 > Interestingly, after the latest rebase, several deps get **down**graded: > ```diff > < cadwyn==6.0.0 > > cadwyn==5.4.6 > < fastapi==0.128.0 > > fastapi==0.117.1 > < starlette==0.50.0 > > starlette==0.48.0 > ``` I think @jscheffl wrote something about it in the cicd channel on slack, where the pipeline got dependency conflicts on Arm, maybe it can help him. Here is the slack message: https://apache-airflow.slack.com/archives/C015SLQF059/p1767483103162959?thread_ts=1767480352.315619&cid=C015SLQF059 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on PR #59218: URL: https://github.com/apache/airflow/pull/59218#issuecomment-3707651812 Interestingly, after the latest rebase, several deps get **down**graded: ```diff < cadwyn==6.0.0 > cadwyn==5.4.6 < fastapi==0.128.0 > fastapi==0.117.1 < starlette==0.50.0 > starlette==0.48.0 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on PR #59218: URL: https://github.com/apache/airflow/pull/59218#issuecomment-3707285531 Lazy consensus passed. I guess we're waiting for @vincbeck's blessing now... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Nataneljpwd commented on code in PR #59218:
URL: https://github.com/apache/airflow/pull/59218#discussion_r2651184386
##
airflow-core/src/airflow/utils/db.py:
##
@@ -700,39 +721,101 @@ def _create_db_from_orm(session):
log.info("Getting alembic config")
config = _get_alembic_config()
-# Use AUTOCOMMIT for DDL to avoid metadata lock issues
-with AutocommitEngineForMySQL(): # TODO: enable for sqlite too
-from alembic import command
+log.info("Stamping migration head")
+command.stamp(config, "head")
-log.info("Stamping migration head")
-command.stamp(config, "head")
+log.info("Airflow database tables created")
-log.info("Airflow database tables created")
+
+def _create_db_from_orm(session):
+"""Create database tables from ORM models and stamp alembic version."""
+log.info("Creating Airflow database tables from the ORM")
+_setup_debug_logging_if_needed()
+
+if get_dialect_name(session) == "mysql":
+_create_db_from_orm_mysql(session)
+else:
+_create_db_from_orm_default(session)
def _setup_debug_logging_if_needed():
"""Set up debug logging and stack trace dumping if SQLALCHEMY_ENGINE_DEBUG
is set."""
if not os.environ.get("SQLALCHEMY_ENGINE_DEBUG"):
return
+import atexit
import faulthandler
-import threading
+from contextlib import suppress
# Enable SQLA debug logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.DEBUG)
-# Enable Fault Handler
+# Enable faulthandler for debugging long-running threads and deadlocks,
+# but disable it before interpreter shutdown to avoid segfaults during
+# cleanup (especially with SQLAlchemy 2.0 + pytest teardown)
faulthandler.enable(file=sys.stderr, all_threads=True)
-# Print Active Threads and Stack Traces Periodically
-def dump_stacks():
-while True:
-for thread_id, frame in sys._current_frames().items():
-log.info("\nThread %s stack:", thread_id)
-traceback.print_stack(frame)
-time.sleep(300)
+# Cancel any pending traceback dumps and disable faulthandler before exit
+# to prevent it from interfering with C extension cleanup
+def cleanup_faulthandler():
+with suppress(Exception):
+faulthandler.cancel_dump_traceback_later()
+with suppress(Exception):
+faulthandler.disable()
+
+atexit.register(cleanup_faulthandler)
+
+# Set up periodic traceback dumps for debugging hanging tests/threads
+faulthandler.dump_traceback_later(timeout=300, repeat=True,
file=sys.stderr)
+
-threading.Thread(target=dump_stacks, daemon=True).start()
[email protected]
+def _mysql_lock_session_for_migration(original_session: Session) ->
Generator[Session, None, None]:
+"""
+Create a MySQL-specific lock session for migration operations.
+
+This context manager:
+1. Commits the original session to release metadata locks
+2. Creates a new session bound to the engine
+3. Ensures the session is properly closed on exit
+
+:param original_session: The original session to commit
+:return: A new session suitable for use with create_global_lock
+"""
+from sqlalchemy.orm import Session as SASession
+
+log.info("MySQL: Committing session to release metadata locks")
+original_session.commit()
+
+lock_session = SASession(bind=settings.engine)
Review Comment:
it seems that copilot may have made a mistake here, the engine and
get_engine are not sqlalchemy settings, and is just an airflow module, they do
behave differently, as get_engine has an additional check, rather than direct
access to a variable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218:
URL: https://github.com/apache/airflow/pull/59218#discussion_r2650249171
##
airflow-core/src/airflow/utils/db.py:
##
@@ -700,39 +721,101 @@ def _create_db_from_orm(session):
log.info("Getting alembic config")
config = _get_alembic_config()
-# Use AUTOCOMMIT for DDL to avoid metadata lock issues
-with AutocommitEngineForMySQL(): # TODO: enable for sqlite too
-from alembic import command
+log.info("Stamping migration head")
+command.stamp(config, "head")
-log.info("Stamping migration head")
-command.stamp(config, "head")
+log.info("Airflow database tables created")
-log.info("Airflow database tables created")
+
+def _create_db_from_orm(session):
+"""Create database tables from ORM models and stamp alembic version."""
+log.info("Creating Airflow database tables from the ORM")
+_setup_debug_logging_if_needed()
+
+if get_dialect_name(session) == "mysql":
+_create_db_from_orm_mysql(session)
+else:
+_create_db_from_orm_default(session)
def _setup_debug_logging_if_needed():
"""Set up debug logging and stack trace dumping if SQLALCHEMY_ENGINE_DEBUG
is set."""
if not os.environ.get("SQLALCHEMY_ENGINE_DEBUG"):
return
+import atexit
import faulthandler
-import threading
+from contextlib import suppress
# Enable SQLA debug logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.DEBUG)
-# Enable Fault Handler
+# Enable faulthandler for debugging long-running threads and deadlocks,
+# but disable it before interpreter shutdown to avoid segfaults during
+# cleanup (especially with SQLAlchemy 2.0 + pytest teardown)
faulthandler.enable(file=sys.stderr, all_threads=True)
-# Print Active Threads and Stack Traces Periodically
-def dump_stacks():
-while True:
-for thread_id, frame in sys._current_frames().items():
-log.info("\nThread %s stack:", thread_id)
-traceback.print_stack(frame)
-time.sleep(300)
+# Cancel any pending traceback dumps and disable faulthandler before exit
+# to prevent it from interfering with C extension cleanup
+def cleanup_faulthandler():
+with suppress(Exception):
+faulthandler.cancel_dump_traceback_later()
+with suppress(Exception):
+faulthandler.disable()
+
+atexit.register(cleanup_faulthandler)
+
+# Set up periodic traceback dumps for debugging hanging tests/threads
+faulthandler.dump_traceback_later(timeout=300, repeat=True,
file=sys.stderr)
+
-threading.Thread(target=dump_stacks, daemon=True).start()
[email protected]
+def _mysql_lock_session_for_migration(original_session: Session) ->
Generator[Session, None, None]:
+"""
+Create a MySQL-specific lock session for migration operations.
+
+This context manager:
+1. Commits the original session to release metadata locks
+2. Creates a new session bound to the engine
+3. Ensures the session is properly closed on exit
+
+:param original_session: The original session to commit
+:return: A new session suitable for use with create_global_lock
+"""
+from sqlalchemy.orm import Session as SASession
+
+log.info("MySQL: Committing session to release metadata locks")
+original_session.commit()
+
+lock_session = SASession(bind=settings.engine)
Review Comment:
Couldn't get this one working, ended up reverting it. The code behaves
differently on 2.x and 3.x - might need to add a compat shim or try/except.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218:
URL: https://github.com/apache/airflow/pull/59218#discussion_r2648630938
##
providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py:
##
@@ -888,20 +888,26 @@ def test_get_db_manager(self, auth_manager):
@mock.patch("airflow.utils.db.drop_airflow_models")
@mock.patch("airflow.utils.db.drop_airflow_moved_tables")
@mock.patch("airflow.utils.db.initdb")
[email protected]("airflow.settings.engine.connect")
[email protected]("airflow.settings.engine")
Review Comment:
The current approach works correctly because
`mock.patch("airflow.settings.engine")` patches the module attribute that
`get_engine()` returns. The more critical issue (missing session bind dialect
mock) was already addressed in the context of a previous comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Copilot commented on code in PR #59218:
URL: https://github.com/apache/airflow/pull/59218#discussion_r2648327300
##
airflow-core/src/airflow/utils/db.py:
##
@@ -1203,18 +1338,32 @@ def downgrade(*, to_revision, from_revision=None,
show_sql_only=False, session:
log.info("Attempting downgrade to revision %s", to_revision)
config = _get_alembic_config()
+
# If downgrading to less than 3.0.0, we need to handle the FAB provider
if _revision_greater(config, _REVISION_HEADS_MAP["2.10.3"], to_revision):
_handle_fab_downgrade(session=session)
-with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
+
+# Determine which session to use for the migration operations
+if get_dialect_name(session) == "mysql":
+# MySQL: Commit session to release metadata locks before Alembic DDL
+session_cm: contextlib.AbstractContextManager[Session] =
_mysql_lock_session_for_migration(session)
+else:
+# PostgreSQL / SQLite: Use original session
+session_cm = contextlib.nullcontext(session)
+
+with (
+session_cm as work_session,
+create_global_lock(session=work_session, lock=DBLocks.MIGRATIONS),
+):
if show_sql_only:
log.warning("Generating sql scripts for manual migration.")
if not from_revision:
-from_revision = _get_current_revision(session)
+from_revision = _get_current_revision(work_session)
revision_range = f"{from_revision}:{to_revision}"
_offline_migration(command.downgrade, config=config,
revision=revision_range)
else:
-log.info("Applying downgrade migrations to Airflow database.")
+dialect_label = " (MySQL)" if get_dialect_name(session) == "mysql"
else ""
Review Comment:
Inconsistent session usage: Line 1365 calls `get_dialect_name(session)`
using the original session parameter, but we're inside a context where
`work_session` is the active session being used for operations. For
consistency, this should use `get_dialect_name(work_session)` instead. This
could be problematic for MySQL where the original session has been committed
and a new session created.
```suggestion
dialect_label = " (MySQL)" if get_dialect_name(work_session) ==
"mysql" else ""
```
##
airflow-core/src/airflow/utils/db.py:
##
@@ -1120,38 +1235,56 @@ def upgradedb(
exit(1)
if not _get_current_revision(session=session) and not to_revision:
-# Don't load default connections
# New DB; initialize and exit
initdb(session=session)
return
-with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
-import sqlalchemy.pool
-log.info("Migrating the Airflow database")
-val = os.environ.get("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE")
-try:
-# Reconfigure the ORM to use _EXACTLY_ one connection, otherwise
some db engines hang forever
-# trying to ALTER TABLEs
-os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE"] = "1"
-
settings.reconfigure_orm(pool_class=sqlalchemy.pool.SingletonThreadPool)
-command.upgrade(config, revision=to_revision or "heads")
-current_revision = _get_current_revision(session=session)
-with _configured_alembic_environment() as env:
-source_heads = env.script.get_heads()
-if current_revision == source_heads[0]:
-# Only run external DB upgrade migration if user upgraded to
heads
+_run_upgradedb(config, to_revision, session)
+
+
+def _resetdb_mysql(session: Session) -> None:
+"""Drop all Airflow tables for MySQL."""
+from sqlalchemy.orm import Session as SASession
+
+# MySQL: Release metadata locks and use AUTOCOMMIT for DDL
+log.info("MySQL: Releasing metadata locks before DDL operations")
+session.commit()
+session.close()
+
+# Use create_global_lock for migration safety (now handles MySQL with
AUTOCOMMIT)
+engine = settings.get_engine()
+lock_session = SASession(bind=engine)
+try:
+with (
+create_global_lock(session=lock_session, lock=DBLocks.MIGRATIONS),
+engine.connect() as connection,
+):
+ddl_conn =
connection.execution_options(isolation_level="AUTOCOMMIT")
+
+drop_airflow_models(ddl_conn)
+drop_airflow_moved_tables(ddl_conn)
+log.info("Dropped all Airflow tables")
+
+# Use raw Session to avoid scoped session issues
+work_session = SASession(bind=ddl_conn)
+try:
external_db_manager = RunDBManager()
-external_db_manager.upgradedb(session)
+external_db_manager.drop_tables(work_session, ddl_conn)
+finally:
+work_session.close()
+finally:
+lock_session.close()
-
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
jscheffl commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2647185128 ## airflow-core/pyproject.toml: ## @@ -135,8 +135,8 @@ dependencies = [ "rich-argparse>=1.0.0", "rich>=13.6.0", "setproctitle>=1.3.3", -# The issue tracking deprecations for sqlalchemy 2 is https://github.com/apache/airflow/issues/28723 -"sqlalchemy[asyncio]>=1.4.49", +# SQLAlchemy >=2.0.36 fixes Python 3.13 TypingOnly import AssertionError caused by new typing attributes (__static_attributes__, __firstlineno__) +"sqlalchemy[asyncio]>=2.0.36", Review Comment: Had also one thought in my mind - I think we are not affected but not sure how much custom code could be affected relying on SQLalchemy? Airflow <=3.1 is pinned to require SQLA<2 and if we merge it like this then Airflow >=3.2 will require SQLA>=2. In Pydantic we also had such breaking change, maybe there are more people affected but there also we supported both v1+v2 in parallel for a while. (yes and with the overhead having a CI test always running Pydantic v1 as well...) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
jscheffl commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2647185128 ## airflow-core/pyproject.toml: ## @@ -135,8 +135,8 @@ dependencies = [ "rich-argparse>=1.0.0", "rich>=13.6.0", "setproctitle>=1.3.3", -# The issue tracking deprecations for sqlalchemy 2 is https://github.com/apache/airflow/issues/28723 -"sqlalchemy[asyncio]>=1.4.49", +# SQLAlchemy >=2.0.36 fixes Python 3.13 TypingOnly import AssertionError caused by new typing attributes (__static_attributes__, __firstlineno__) +"sqlalchemy[asyncio]>=2.0.36", Review Comment: Had also one thought in my mind - I think we are not affected but not sure how much custom code could be affected relying on SQLalchemy? Airflow <=3.1 is pinned to require SQLA<2 and if we merge it like this then Airflow >=3.2 will require SQLA>=2. In Pydantic we also had such breaking change, maybe there are more people affected but there also we supported both v1+v2 in parallel for a while. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
potiuk commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2647159403 ## airflow-core/pyproject.toml: ## @@ -135,8 +135,8 @@ dependencies = [ "rich-argparse>=1.0.0", "rich>=13.6.0", "setproctitle>=1.3.3", -# The issue tracking deprecations for sqlalchemy 2 is https://github.com/apache/airflow/issues/28723 -"sqlalchemy[asyncio]>=1.4.49", +# SQLAlchemy >=2.0.36 fixes Python 3.13 TypingOnly import AssertionError caused by new typing attributes (__static_attributes__, __firstlineno__) +"sqlalchemy[asyncio]>=2.0.36", Review Comment: Yes. I think sqlalchemy 1 should be dropped. I think we should have a short DISCUSS thread on it with pro/cons from people who have opinions and have a vote/consensus on it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
potiuk commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2647159403 ## airflow-core/pyproject.toml: ## @@ -135,8 +135,8 @@ dependencies = [ "rich-argparse>=1.0.0", "rich>=13.6.0", "setproctitle>=1.3.3", -# The issue tracking deprecations for sqlalchemy 2 is https://github.com/apache/airflow/issues/28723 -"sqlalchemy[asyncio]>=1.4.49", +# SQLAlchemy >=2.0.36 fixes Python 3.13 TypingOnly import AssertionError caused by new typing attributes (__static_attributes__, __firstlineno__) +"sqlalchemy[asyncio]>=2.0.36", Review Comment: Yes sqlqlchemy1 should be dropped. I think we should have a short DISCUSS thread on it with pro/cons from people who have opinions and have a vote/consensus on it ## airflow-core/pyproject.toml: ## @@ -135,8 +135,8 @@ dependencies = [ "rich-argparse>=1.0.0", "rich>=13.6.0", "setproctitle>=1.3.3", -# The issue tracking deprecations for sqlalchemy 2 is https://github.com/apache/airflow/issues/28723 -"sqlalchemy[asyncio]>=1.4.49", +# SQLAlchemy >=2.0.36 fixes Python 3.13 TypingOnly import AssertionError caused by new typing attributes (__static_attributes__, __firstlineno__) +"sqlalchemy[asyncio]>=2.0.36", Review Comment: Yes sqlqlchemy1 should be dropped. I think we should have a short DISCUSS thread on it with pro/cons from people who have opinions and have a vote/consensus on it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218:
URL: https://github.com/apache/airflow/pull/59218#discussion_r2646700218
##
airflow-core/src/airflow/utils/db.py:
##
@@ -700,39 +721,102 @@ def _create_db_from_orm(session):
log.info("Getting alembic config")
config = _get_alembic_config()
-# Use AUTOCOMMIT for DDL to avoid metadata lock issues
-with AutocommitEngineForMySQL(): # TODO: enable for sqlite too
-from alembic import command
+log.info("Stamping migration head")
+command.stamp(config, "head")
-log.info("Stamping migration head")
-command.stamp(config, "head")
+log.info("Airflow database tables created")
+
+
+def _create_db_from_orm(session):
+"""Create database tables from ORM models and stamp alembic version."""
+log.info("Creating Airflow database tables from the ORM")
+_setup_debug_logging_if_needed()
-log.info("Airflow database tables created")
+if get_dialect_name(session) == "mysql":
+_create_db_from_orm_mysql(session)
+else:
+_create_db_from_orm_default(session)
def _setup_debug_logging_if_needed():
"""Set up debug logging and stack trace dumping if SQLALCHEMY_ENGINE_DEBUG
is set."""
if not os.environ.get("SQLALCHEMY_ENGINE_DEBUG"):
return
+import atexit
import faulthandler
-import threading
+from contextlib import suppress
# Enable SQLA debug logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.DEBUG)
-# Enable Fault Handler
+# Enable faulthandler for debugging long-running threads and deadlocks,
+# but disable it before interpreter shutdown to avoid segfaults during
+# cleanup (especially with SQLAlchemy 2.0 + pytest teardown)
faulthandler.enable(file=sys.stderr, all_threads=True)
-# Print Active Threads and Stack Traces Periodically
-def dump_stacks():
-while True:
-for thread_id, frame in sys._current_frames().items():
-log.info("\nThread %s stack:", thread_id)
-traceback.print_stack(frame)
-time.sleep(300)
+# Cancel any pending traceback dumps and disable faulthandler before exit
+# to prevent it from interfering with C extension cleanup
+def cleanup_faulthandler():
+with suppress(Exception):
+faulthandler.cancel_dump_traceback_later()
+with suppress(Exception):
+faulthandler.disable()
+
+atexit.register(cleanup_faulthandler)
+
+# Set up periodic traceback dumps for debugging hanging tests/threads
+faulthandler.dump_traceback_later(timeout=300, repeat=True,
file=sys.stderr)
+
+
[email protected]
+def _mysql_lock_session_for_migration(original_session: Session) ->
Generator[Session, None, None]:
+"""
+Create a MySQL-specific lock session for migration operations.
+
+This context manager:
+1. Commits the original session to release metadata locks
+2. Creates a new session bound to the engine
+3. Ensures the session is properly closed on exit
+
+:param original_session: The original session to commit
+:return: A new session suitable for use with create_global_lock
+"""
+from sqlalchemy.orm import Session as SASession
+
+log.info("MySQL: Committing session to release metadata locks")
+original_session.commit()
+
+lock_session = SASession(bind=settings.engine)
+try:
+yield lock_session
+finally:
+lock_session.close()
-threading.Thread(target=dump_stacks, daemon=True).start()
+
[email protected]
+def _single_connection_pool() -> Generator[None, None, None]:
+"""
+Temporarily reconfigure ORM to use exactly one connection.
+
+This is needed for migrations because some database engines hang forever
+trying to ALTER TABLEs when multiple connections exist in the pool.
+
+Saves and restores the AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE environment
variable.
+"""
+import sqlalchemy.pool
+
+val = os.environ.get("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE")
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218:
URL: https://github.com/apache/airflow/pull/59218#discussion_r2646699841
##
airflow-core/src/airflow/utils/db.py:
##
@@ -700,39 +721,102 @@ def _create_db_from_orm(session):
log.info("Getting alembic config")
config = _get_alembic_config()
-# Use AUTOCOMMIT for DDL to avoid metadata lock issues
-with AutocommitEngineForMySQL(): # TODO: enable for sqlite too
-from alembic import command
+log.info("Stamping migration head")
+command.stamp(config, "head")
-log.info("Stamping migration head")
-command.stamp(config, "head")
+log.info("Airflow database tables created")
+
+
+def _create_db_from_orm(session):
+"""Create database tables from ORM models and stamp alembic version."""
+log.info("Creating Airflow database tables from the ORM")
+_setup_debug_logging_if_needed()
-log.info("Airflow database tables created")
+if get_dialect_name(session) == "mysql":
+_create_db_from_orm_mysql(session)
+else:
+_create_db_from_orm_default(session)
def _setup_debug_logging_if_needed():
"""Set up debug logging and stack trace dumping if SQLALCHEMY_ENGINE_DEBUG
is set."""
if not os.environ.get("SQLALCHEMY_ENGINE_DEBUG"):
return
+import atexit
import faulthandler
-import threading
+from contextlib import suppress
# Enable SQLA debug logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.DEBUG)
-# Enable Fault Handler
+# Enable faulthandler for debugging long-running threads and deadlocks,
+# but disable it before interpreter shutdown to avoid segfaults during
+# cleanup (especially with SQLAlchemy 2.0 + pytest teardown)
faulthandler.enable(file=sys.stderr, all_threads=True)
-# Print Active Threads and Stack Traces Periodically
-def dump_stacks():
-while True:
-for thread_id, frame in sys._current_frames().items():
-log.info("\nThread %s stack:", thread_id)
-traceback.print_stack(frame)
-time.sleep(300)
+# Cancel any pending traceback dumps and disable faulthandler before exit
+# to prevent it from interfering with C extension cleanup
+def cleanup_faulthandler():
+with suppress(Exception):
+faulthandler.cancel_dump_traceback_later()
+with suppress(Exception):
+faulthandler.disable()
+
+atexit.register(cleanup_faulthandler)
+
+# Set up periodic traceback dumps for debugging hanging tests/threads
+faulthandler.dump_traceback_later(timeout=300, repeat=True,
file=sys.stderr)
+
+
[email protected]
+def _mysql_lock_session_for_migration(original_session: Session) ->
Generator[Session, None, None]:
+"""
+Create a MySQL-specific lock session for migration operations.
+
+This context manager:
+1. Commits the original session to release metadata locks
+2. Creates a new session bound to the engine
+3. Ensures the session is properly closed on exit
+
+:param original_session: The original session to commit
+:return: A new session suitable for use with create_global_lock
+"""
+from sqlalchemy.orm import Session as SASession
+
+log.info("MySQL: Committing session to release metadata locks")
+original_session.commit()
+
+lock_session = SASession(bind=settings.engine)
+try:
+yield lock_session
+finally:
+lock_session.close()
-threading.Thread(target=dump_stacks, daemon=True).start()
+
[email protected]
+def _single_connection_pool() -> Generator[None, None, None]:
+"""
+Temporarily reconfigure ORM to use exactly one connection.
+
+This is needed for migrations because some database engines hang forever
+trying to ALTER TABLEs when multiple connections exist in the pool.
+
+Saves and restores the AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE environment
variable.
+"""
+import sqlalchemy.pool
+
+val = os.environ.get("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE")
+try:
+os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE"] = "1"
+
settings.reconfigure_orm(pool_class=sqlalchemy.pool.SingletonThreadPool)
+yield
+finally:
+if val is None:
+os.environ.pop("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE", None)
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2646699200 ## airflow-core/src/airflow/utils/db_manager.py: ## @@ -48,6 +49,21 @@ def __init__(self, session): super().__init__() self.session = session +def _release_mysql_metadata_locks(self) -> None: +""" +Release MySQL metadata locks by committing the session. + +MySQL requires metadata locks to be released before DDL operations. +This is done by committing the current transaction. +This method is a no-op for non-MySQL databases. +""" +if get_dialect_name(self.session) != "mysql": +return Review Comment: The method is not MySQL-specific in its usage pattern; it's a utility method that happens to only do work for MySQL today. Removing the check would require adding dialect checks at all 4 call sites (which was removed in response to your previous comment). I renamed the method to `_release_metadata_locks_if_needed` so that: - Generic name matches the generic usage pattern (called for all dialects) - `_if_needed` suffix clearly signals conditional behavior - Docstring already explains it's MySQL-specific internally - Future-proof - if another DB ever needs similar handling, the name still works - No cognitive dissonance from calling a "mysql" method on another dialect -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2646633318 ## airflow-core/src/airflow/utils/db_manager.py: ## @@ -48,6 +49,21 @@ def __init__(self, session): super().__init__() self.session = session +def _release_mysql_metadata_locks(self) -> None: +""" +Release MySQL metadata locks by committing the session. + +MySQL requires metadata locks to be released before DDL operations. +This is done by committing the current transaction. +This method is a no-op for non-MySQL databases. +""" +if get_dialect_name(self.session) != "mysql": +return Review Comment: The method is not MySQL-specific in its usage pattern; it's a utility method that happens to only do work for MySQL. Removing the check would require adding dialect checks at all 4 call sites (which was removed in response to your previous comment). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2646633318 ## airflow-core/src/airflow/utils/db_manager.py: ## @@ -48,6 +49,21 @@ def __init__(self, session): super().__init__() self.session = session +def _release_mysql_metadata_locks(self) -> None: +""" +Release MySQL metadata locks by committing the session. + +MySQL requires metadata locks to be released before DDL operations. +This is done by committing the current transaction. +This method is a no-op for non-MySQL databases. +""" +if get_dialect_name(self.session) != "mysql": +return Review Comment: The method is not MySQL-specific in its usage pattern; it's a utility method that happens to only do work for MySQL. Removing the check would require adding dialect checks at all 4 call sites (which was removed in response to your previous comment). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218:
URL: https://github.com/apache/airflow/pull/59218#discussion_r2646634110
##
airflow-core/src/airflow/utils/db.py:
##
@@ -1340,58 +1498,108 @@ def __str__(self):
@contextlib.contextmanager
-def create_global_lock(
-session: Session,
-lock: DBLocks,
-lock_timeout: int = 1800,
+def _create_global_lock_mysql(lock: DBLocks, lock_timeout: int) ->
Generator[None, None, None]:
+"""
+Create a global advisory lock for MySQL.
+
+Uses a dedicated AUTOCOMMIT connection because:
+- GET_LOCK is session-level, not transaction-level
+- DDL operations cause implicit commits that would break transaction
wrappers
+"""
+server_version = settings.get_engine().dialect.server_version_info
+if not (server_version and server_version >= (5, 6)):
Review Comment:
Fixed.
##
airflow-core/src/airflow/utils/db.py:
##
@@ -1256,14 +1406,16 @@ def _handle_fab_downgrade(*, session: Session) -> None:
fab_version,
)
return
-connection = settings.get_engine().connect()
-insp = inspect(connection)
-if not fab_version and insp.has_table("ab_user"):
-log.info(
-"FAB migration version not found, but FAB tables exist. "
-"FAB provider is not required for downgrade.",
-)
-return
+
+# Use context manager to ensure connection is closed
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2646633318 ## airflow-core/src/airflow/utils/db_manager.py: ## @@ -48,6 +49,21 @@ def __init__(self, session): super().__init__() self.session = session +def _release_mysql_metadata_locks(self) -> None: +""" +Release MySQL metadata locks by committing the session. + +MySQL requires metadata locks to be released before DDL operations. +This is done by committing the current transaction. +This method is a no-op for non-MySQL databases. +""" +if get_dialect_name(self.session) != "mysql": +return Review Comment: The method is not MySQL-specific in its usage pattern; it's a utility method that happens to only do work for MySQL. Removing the check would require adding dialect checks at all 4 call sites (which was removed in response to your previous comment). As compromise - I renamed it to `_release_metadata_locks_if_needed`, which makes the conditional nature of it explicit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Nataneljpwd commented on code in PR #59218:
URL: https://github.com/apache/airflow/pull/59218#discussion_r2646187195
##
airflow-core/src/airflow/utils/db.py:
##
@@ -1256,14 +1406,16 @@ def _handle_fab_downgrade(*, session: Session) -> None:
fab_version,
)
return
-connection = settings.get_engine().connect()
-insp = inspect(connection)
-if not fab_version and insp.has_table("ab_user"):
-log.info(
-"FAB migration version not found, but FAB tables exist. "
-"FAB provider is not required for downgrade.",
-)
-return
+
+# Use context manager to ensure connection is closed
Review Comment:
do we need this comment? as, to me at least, it seems obvious that if I use
a context manager for a connection, it is to ensure it's closed when I finish
##
airflow-core/src/airflow/utils/db.py:
##
@@ -700,39 +721,102 @@ def _create_db_from_orm(session):
log.info("Getting alembic config")
config = _get_alembic_config()
-# Use AUTOCOMMIT for DDL to avoid metadata lock issues
-with AutocommitEngineForMySQL(): # TODO: enable for sqlite too
-from alembic import command
+log.info("Stamping migration head")
+command.stamp(config, "head")
-log.info("Stamping migration head")
-command.stamp(config, "head")
+log.info("Airflow database tables created")
+
+
+def _create_db_from_orm(session):
+"""Create database tables from ORM models and stamp alembic version."""
+log.info("Creating Airflow database tables from the ORM")
+_setup_debug_logging_if_needed()
-log.info("Airflow database tables created")
+if get_dialect_name(session) == "mysql":
+_create_db_from_orm_mysql(session)
+else:
+_create_db_from_orm_default(session)
def _setup_debug_logging_if_needed():
"""Set up debug logging and stack trace dumping if SQLALCHEMY_ENGINE_DEBUG
is set."""
if not os.environ.get("SQLALCHEMY_ENGINE_DEBUG"):
return
+import atexit
import faulthandler
-import threading
+from contextlib import suppress
# Enable SQLA debug logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.DEBUG)
-# Enable Fault Handler
+# Enable faulthandler for debugging long-running threads and deadlocks,
+# but disable it before interpreter shutdown to avoid segfaults during
+# cleanup (especially with SQLAlchemy 2.0 + pytest teardown)
faulthandler.enable(file=sys.stderr, all_threads=True)
-# Print Active Threads and Stack Traces Periodically
-def dump_stacks():
-while True:
-for thread_id, frame in sys._current_frames().items():
-log.info("\nThread %s stack:", thread_id)
-traceback.print_stack(frame)
-time.sleep(300)
+# Cancel any pending traceback dumps and disable faulthandler before exit
+# to prevent it from interfering with C extension cleanup
+def cleanup_faulthandler():
+with suppress(Exception):
+faulthandler.cancel_dump_traceback_later()
+with suppress(Exception):
+faulthandler.disable()
+
+atexit.register(cleanup_faulthandler)
+
+# Set up periodic traceback dumps for debugging hanging tests/threads
+faulthandler.dump_traceback_later(timeout=300, repeat=True,
file=sys.stderr)
+
+
[email protected]
+def _mysql_lock_session_for_migration(original_session: Session) ->
Generator[Session, None, None]:
+"""
+Create a MySQL-specific lock session for migration operations.
+
+This context manager:
+1. Commits the original session to release metadata locks
+2. Creates a new session bound to the engine
+3. Ensures the session is properly closed on exit
+
+:param original_session: The original session to commit
+:return: A new session suitable for use with create_global_lock
+"""
+from sqlalchemy.orm import Session as SASession
+
+log.info("MySQL: Committing session to release metadata locks")
+original_session.commit()
+
+lock_session = SASession(bind=settings.engine)
+try:
+yield lock_session
+finally:
+lock_session.close()
-threading.Thread(target=dump_stacks, daemon=True).start()
+
[email protected]
+def _single_connection_pool() -> Generator[None, None, None]:
+"""
+Temporarily reconfigure ORM to use exactly one connection.
+
+This is needed for migrations because some database engines hang forever
+trying to ALTER TABLEs when multiple connections exist in the pool.
+
+Saves and restores the AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE environment
variable.
+"""
+import sqlalchemy.pool
+
+val = os.environ.get("AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE")
+try:
+os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_SIZE"] = "1"
+
settings.reconfigure
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2645590147 ## airflow-core/pyproject.toml: ## @@ -135,8 +135,8 @@ dependencies = [ "rich-argparse>=1.0.0", "rich>=13.6.0", "setproctitle>=1.3.3", -# The issue tracking deprecations for sqlalchemy 2 is https://github.com/apache/airflow/issues/28723 -"sqlalchemy[asyncio]>=1.4.49", +# SQLAlchemy >=2.0.36 fixes Python 3.13 TypingOnly import AssertionError caused by new typing attributes (__static_attributes__, __firstlineno__) +"sqlalchemy[asyncio]>=2.0.36", Review Comment: Jarek [mentioned in the past that](https://github.com/apache/airflow/pull/56212#issuecomment-3488150222): > _There is nothing **holding** us with SQLalchemy 1 for Airflow 3.2_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2645498473 ## airflow-core/pyproject.toml: ## @@ -135,8 +135,8 @@ dependencies = [ "rich-argparse>=1.0.0", "rich>=13.6.0", "setproctitle>=1.3.3", -# The issue tracking deprecations for sqlalchemy 2 is https://github.com/apache/airflow/issues/28723 -"sqlalchemy[asyncio]>=1.4.49", +# SQLAlchemy >=2.0.36 fixes Python 3.13 TypingOnly import AssertionError caused by new typing attributes (__static_attributes__, __firstlineno__) +"sqlalchemy[asyncio]>=2.0.36", Review Comment: That's a good question. Perhaps @potiuk or @vincbeck know how to answer this. Conceivably, we can set it to ```toml "sqlalchemy[asyncio]>=1.4.49; python_version < '3.13'", "sqlalchemy[asyncio]>=2.0.36; python_version >= '3.13'", ``` ...although in that case mypy will keep running on 1.4, and this PR will have achieved little. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2645498473 ## airflow-core/pyproject.toml: ## @@ -135,8 +135,8 @@ dependencies = [ "rich-argparse>=1.0.0", "rich>=13.6.0", "setproctitle>=1.3.3", -# The issue tracking deprecations for sqlalchemy 2 is https://github.com/apache/airflow/issues/28723 -"sqlalchemy[asyncio]>=1.4.49", +# SQLAlchemy >=2.0.36 fixes Python 3.13 TypingOnly import AssertionError caused by new typing attributes (__static_attributes__, __firstlineno__) +"sqlalchemy[asyncio]>=2.0.36", Review Comment: That's a good question. Perhaps @potiuk or @vincbeck know how to answer this. Conceivably, we can set it to ```toml "sqlalchemy[asyncio]>=1.4.49; python_version < '3.13'", "sqlalchemy[asyncio]>=2.0.36; python_version >= '3.13'", ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2645498473 ## airflow-core/pyproject.toml: ## @@ -135,8 +135,8 @@ dependencies = [ "rich-argparse>=1.0.0", "rich>=13.6.0", "setproctitle>=1.3.3", -# The issue tracking deprecations for sqlalchemy 2 is https://github.com/apache/airflow/issues/28723 -"sqlalchemy[asyncio]>=1.4.49", +# SQLAlchemy >=2.0.36 fixes Python 3.13 TypingOnly import AssertionError caused by new typing attributes (__static_attributes__, __firstlineno__) +"sqlalchemy[asyncio]>=2.0.36", Review Comment: That's a good question. Perhaps @potiuk or @vincbeck know how to answer this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
jscheffl commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2645417969 ## airflow-core/pyproject.toml: ## @@ -135,8 +135,8 @@ dependencies = [ "rich-argparse>=1.0.0", "rich>=13.6.0", "setproctitle>=1.3.3", -# The issue tracking deprecations for sqlalchemy 2 is https://github.com/apache/airflow/issues/28723 -"sqlalchemy[asyncio]>=1.4.49", +# SQLAlchemy >=2.0.36 fixes Python 3.13 TypingOnly import AssertionError caused by new typing attributes (__static_attributes__, __firstlineno__) +"sqlalchemy[asyncio]>=2.0.36", Review Comment: Does it mean with this PR we force upgrading to sqlalchemy 2.x and do _not_ support 1.x anymore? Is this the desired state or do we leave integrations behind which still rely on 1.x? (I was not involved in the discussions, so just asking if we want to force it then with Airflow 3.2) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2645326449 ## airflow-core/src/airflow/utils/db.py: ## @@ -1120,36 +1195,47 @@ def upgradedb( exit(1) if not _get_current_revision(session=session) and not to_revision: -# Don't load default connections # New DB; initialize and exit initdb(session=session) return -with create_global_lock(session=session, lock=DBLocks.MIGRATIONS): -import sqlalchemy.pool +if settings.get_engine().dialect.name == "mysql": Review Comment: Fixed locally, changes will be visible on the next rebase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218:
URL: https://github.com/apache/airflow/pull/59218#discussion_r2645325440
##
airflow-core/src/airflow/utils/db.py:
##
@@ -1236,7 +1372,7 @@ def _get_fab_migration_version(*, session: Session) ->
str | None:
return None
-def _handle_fab_downgrade(*, session: Session) -> None:
+def _handle_fab_downgrade(*, session: Session, is_mysql: bool) -> None:
Review Comment:
Fixed locally, changes will be visible on the next rebase.
##
airflow-core/src/airflow/utils/db.py:
##
@@ -685,12 +685,32 @@ def _create_db_from_orm(session):
log.info("Creating Airflow database tables from the ORM")
-# Debug setup if requested
_setup_debug_logging_if_needed()
-log.info("Creating context")
+if get_dialect_name(session) == "mysql":
Review Comment:
Fixed locally, changes will be visible on the next rebase.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2645326879 ## airflow-core/src/airflow/utils/db.py: ## @@ -1345,18 +1490,59 @@ def create_global_lock( lock: DBLocks, lock_timeout: int = 1800, ) -> Generator[None, None, None]: -"""Contextmanager that will create and teardown a global db lock.""" -bind = session.get_bind() -if hasattr(bind, "connect"): -conn = bind.connect() -else: -conn = bind +""" +Contextmanager that will create and teardown a global db lock. + +For MySQL, uses a dedicated AUTOCOMMIT connection because: +- GET_LOCK is session-level, not transaction-level +- DDL operations cause implicit commits that would break transaction wrappers + +For PostgreSQL, uses transactional advisory locks as before. +""" dialect_name = get_dialect_name(session) -try: -if dialect_name == "postgresql": + +if dialect_name == "mysql": Review Comment: Fixed locally, changes will be visible after the next rebase. ## airflow-core/src/airflow/utils/db_manager.py: ## @@ -48,6 +49,24 @@ def __init__(self, session): super().__init__() self.session = session +def _is_mysql(self) -> bool: +"""Check if the database is MySQL.""" +return get_dialect_name(self.session) == "mysql" + +def _release_metadata_locks(self) -> None: Review Comment: Fixed locally, changes will be visible after the next rebase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Dev-iL commented on code in PR #59218: URL: https://github.com/apache/airflow/pull/59218#discussion_r2645325965 ## airflow-core/src/airflow/utils/db.py: ## @@ -1163,17 +1249,52 @@ def resetdb(session: Session = NEW_SESSION, skip_init: bool = False): import_all_models() -connection = settings.engine.connect() +if get_dialect_name(session) == "mysql": Review Comment: Fixed locally, changes will be visible on the next rebase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Upgrade SQLAlchemy (SQLA) to 2.0 [airflow]
Nataneljpwd commented on code in PR #59218:
URL: https://github.com/apache/airflow/pull/59218#discussion_r2645209669
##
airflow-core/src/airflow/utils/db.py:
##
@@ -685,12 +685,32 @@ def _create_db_from_orm(session):
log.info("Creating Airflow database tables from the ORM")
-# Debug setup if requested
_setup_debug_logging_if_needed()
-log.info("Creating context")
+if get_dialect_name(session) == "mysql":
Review Comment:
Same here, quite a bit of MySQL specific logic, maybe it is better to move
it to a different method?
##
airflow-core/src/airflow/utils/db.py:
##
@@ -1120,36 +1195,47 @@ def upgradedb(
exit(1)
if not _get_current_revision(session=session) and not to_revision:
-# Don't load default connections
# New DB; initialize and exit
initdb(session=session)
return
-with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
-import sqlalchemy.pool
+if settings.get_engine().dialect.name == "mysql":
Review Comment:
Same here, quite a bit of MySQL specific logic, could be worth it to move it
to another method
##
airflow-core/src/airflow/utils/db.py:
##
@@ -1236,7 +1372,7 @@ def _get_fab_migration_version(*, session: Session) ->
str | None:
return None
-def _handle_fab_downgrade(*, session: Session) -> None:
+def _handle_fab_downgrade(*, session: Session, is_mysql: bool) -> None:
Review Comment:
Are we sure we want to use a boolean here and change the api of the method
instead of just having another function Incase of MySQL, and when calling it,
have an if statement to decide which method to call
##
airflow-core/src/airflow/utils/db_manager.py:
##
@@ -48,6 +49,24 @@ def __init__(self, session):
super().__init__()
self.session = session
+def _is_mysql(self) -> bool:
+"""Check if the database is MySQL."""
+return get_dialect_name(self.session) == "mysql"
+
+def _release_metadata_locks(self) -> None:
Review Comment:
Maybe it's a good idea to, in the method name, add MySQL if it is MySQL
specific
##
airflow-core/src/airflow/utils/db.py:
##
@@ -1163,17 +1249,52 @@ def resetdb(session: Session = NEW_SESSION, skip_init:
bool = False):
import_all_models()
-connection = settings.engine.connect()
+if get_dialect_name(session) == "mysql":
Review Comment:
There is quite a bit of logic here specific for MySQL, maybe it is a good
idea to separate it to another method
##
airflow-core/src/airflow/utils/db.py:
##
@@ -1345,18 +1490,59 @@ def create_global_lock(
lock: DBLocks,
lock_timeout: int = 1800,
) -> Generator[None, None, None]:
-"""Contextmanager that will create and teardown a global db lock."""
-bind = session.get_bind()
-if hasattr(bind, "connect"):
-conn = bind.connect()
-else:
-conn = bind
+"""
+Contextmanager that will create and teardown a global db lock.
+
+For MySQL, uses a dedicated AUTOCOMMIT connection because:
+- GET_LOCK is session-level, not transaction-level
+- DDL operations cause implicit commits that would break transaction
wrappers
+
+For PostgreSQL, uses transactional advisory locks as before.
+"""
dialect_name = get_dialect_name(session)
-try:
-if dialect_name == "postgresql":
+
+if dialect_name == "mysql":
Review Comment:
Same here, a lot of MySQL specific code which could be separated to a
different method, improving readability
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
