Hey list - I’m posting this here just to get some ideas on what might be happening here, as it may or may not have some impact on Openstack if and when we move to MySQL drivers that are async-patchable, like MySQL-connector or PyMySQL. I had a user post this issue a few days ago which I’ve since distilled into test cases for PyMySQL and MySQL-connector separately. It uses gevent, not eventlet, so I’m not really sure if this applies. But there’s plenty of very smart people here so if anyone can shed some light on what is actually happening here, that would help.
The program essentially illustrates code that performs several steps upon a connection, however if the greenlet is suddenly killed, the state from the connection, while damaged, is still being allowed to continue on in some way, and what’s super-catastrophic here is that you see a transaction actually being committed *without* all the statements proceeding on it. In my work with MySQL drivers, I’ve noted for years that they are all very, very bad at dealing with concurrency-related issues. The whole “MySQL has gone away” and “commands out of sync” errors are ones that we’ve all just drowned in, and so often these are due to the driver getting mixed up due to concurrent use of a connection. However this one seems more insidious. Though at the same time, the script has some complexity happening (like a simplistic connection pool) and I’m not really sure where the core of the issue lies. The script is at https://gist.github.com/zzzeek/d196fa91c40cb515365e and also below. If you run it for a few seconds, go over to your MySQL command line and run this query: SELECT * FROM table_b WHERE a_id not in (SELECT id FROM table_a) ORDER BY a_id DESC; and what you’ll see is tons of rows in table_b where the “a_id” is zero (because cursor.lastrowid fails), but the *rows are committed*. If you read the segment of code that does this, it should be impossible: connection = pool.get() rowid = execute_sql( connection, "INSERT INTO table_a (data) VALUES (%s)", ("a",) ) gevent.sleep(random.random() * 0.2) try: execute_sql( connection, "INSERT INTO table_b (a_id, data) VALUES (%s, %s)", (rowid, "b",) ) connection.commit() pool.return_conn(connection) except Exception: connection.rollback() pool.return_conn(connection) so if the gevent.sleep() throws a timeout error, somehow we are getting thrown back in there, with the connection in an invalid state, but not invalid enough to commit. If a simple check for “SELECT connection_id()” is added, this query fails and the whole issue is prevented. Additionally, if you put a foreign key constraint on that b_table.a_id, then the issue is prevented, and you see that the constraint violation is happening all over the place within the commit() call. The connection is being used such that its state just started after the gevent.sleep() call. Now, there’s also a very rudimental connection pool here. That is also part of what’s going on. If i try to run without the pool, the whole script just runs out of connections, fast, which suggests that this gevent timeout cleans itself up very, very badly. However, SQLAlchemy’s pool works a lot like this one, so if folks here can tell me if the connection pool is doing something bad, then that’s key, because I need to make a comparable change in SQLAlchemy’s pool. Otherwise I worry our eventlet use could have big problems under high load. # -*- coding: utf-8 -*- import gevent.monkey gevent.monkey.patch_all() import collections import threading import time import random import sys import logging logging.basicConfig() log = logging.getLogger('foo') log.setLevel(logging.DEBUG) #import pymysql as dbapi from mysql import connector as dbapi class SimplePool(object): def __init__(self): self.checkedin = collections.deque([ self._connect() for i in range(50) ]) self.checkout_lock = threading.Lock() self.checkin_lock = threading.Lock() def _connect(self): return dbapi.connect( user="scott", passwd="tiger", host="localhost", db="test") def get(self): with self.checkout_lock: while not self.checkedin: time.sleep(.1) return self.checkedin.pop() def return_conn(self, conn): try: conn.rollback() except: log.error("Exception during rollback", exc_info=True) try: conn.close() except: log.error("Exception during close", exc_info=True) # recycle to a new connection conn = self._connect() with self.checkin_lock: self.checkedin.append(conn) def verify_connection_id(conn): cursor = conn.cursor() try: cursor.execute("select connection_id()") row = cursor.fetchone() return row except: return None finally: cursor.close() def execute_sql(conn, sql, params=()): cursor = conn.cursor() cursor.execute(sql, params) lastrowid = cursor.lastrowid cursor.close() return lastrowid pool = SimplePool() # SELECT * FROM table_b WHERE a_id not in # (SELECT id FROM table_a) ORDER BY a_id DESC; PREPARE_SQL = [ "DROP TABLE IF EXISTS table_b", "DROP TABLE IF EXISTS table_a", """CREATE TABLE table_a ( id INT NOT NULL AUTO_INCREMENT, data VARCHAR (256) NOT NULL, PRIMARY KEY (id) ) engine='InnoDB'""", """CREATE TABLE table_b ( id INT NOT NULL AUTO_INCREMENT, a_id INT NOT NULL, data VARCHAR (256) NOT NULL, -- uncomment this to illustrate where the driver is attempting -- to INSERT the row during ROLLBACK -- FOREIGN KEY (a_id) REFERENCES table_a(id), PRIMARY KEY (id) ) engine='InnoDB' """] connection = pool.get() for sql in PREPARE_SQL: execute_sql(connection, sql) connection.commit() pool.return_conn(connection) print("Table prepared...") def transaction_kill_worker(): while True: try: connection = None with gevent.Timeout(0.1): connection = pool.get() rowid = execute_sql( connection, "INSERT INTO table_a (data) VALUES (%s)", ("a",)) gevent.sleep(random.random() * 0.2) try: execute_sql( connection, "INSERT INTO table_b (a_id, data) VALUES (%s, %s)", (rowid, "b",)) # this version prevents the commit from # proceeding on a bad connection # if verify_connection_id(connection): # connection.commit() # this version does not. It will commit the # row for table_b without the table_a being present. connection.commit() pool.return_conn(connection) except Exception: connection.rollback() pool.return_conn(connection) sys.stdout.write("$") except gevent.Timeout: # try to return the connection anyway if connection is not None: pool.return_conn(connection) sys.stdout.write("#") except Exception: # logger.exception(e) sys.stdout.write("@") else: sys.stdout.write(".") finally: if connection is not None: pool.return_conn(connection) def main(): for i in range(50): gevent.spawn(transaction_kill_worker) gevent.sleep(3) while True: gevent.sleep(5) if __name__ == "__main__": main() _______________________________________________ OpenStack-dev mailing list OpenStackemail@example.com http://lists.openstack.org/cgi-bin/mailman/listinfo/openstack-dev