Hello list,
My schema has changed, and now I want to retrieve my old data (of the old
schema) to the new database (with the new schema) from csv files (I export the
tables of the old database to csv files and then load those csv files to the
new database with some column mapping and some gap filling for new columns)
The problem is that my new schema have a UniqueConstraint for some tables (like
: the name column of the city table should be unique within a country). This
constraint was not present in the old schema and data is corrupt (two cities
with the same name in the same country). So when I try to insert them in the
new database, I have IntegrityErrors.
The solution I thought of was :
* Catch the IntegrityError exception
* If it's a problem on a UniqueConstraint, then the exception was raised
because I tried to insert instance B that has the same "key" as instance A that
was inserted before.
* So for all children of B (B's relations), set their parent to A. For
example, for all citizens of B, set their city to A, beause A and B ought to be
the same.
* Then, safely ignore B and move on to the next instance.
Here's what has been done so far (that works, I just use psuedo code for
"illustration" purpose. If necessary, you can look at the actual attached
source files):
line = csvloader.next_row()
ModelClass = get_current_model()
instance = ModelClass.create_instance(**(to_dict(line)))
session.add(instance)
I wish I could do something like this :
try:
session.commit()
except IntegrityError,e :
session.rollback()
errror = get_error()
if type_of(error) == UniqueConstraintError :
original_instance = ModelClass.get(instance.id)
for relation in instance.get_relations() :
# Is this correct ?
instance.relation.inverse = original_instance
session.commit()
My questions are : how to write get_error, type_of, where to get
UniqueContraintError, how to write get_relations, how to set the inverse of a
relation (is instance.realtion.inverse the right thing to set ?) and is this
approach correct ?
--
You received this message because you are subscribed to the Google Groups
"sqlalchemy" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to
[email protected].
For more options, visit this group at
http://groups.google.com/group/sqlalchemy?hl=en.
#!/usr/bin/env python
from coriolis.client.backend.db import DatabaseConnection
from coriolis.models import *
# heh
import sys
dbcnx = DatabaseConnection(env="test")
mapping = {"print_id":"medium_id",
"categ_id":"category_id",
"city_id":"daira_id",
"side_a":"face_a_id",
"side_b":"face_b_id"
}
def load_all():
dbcnx.init(reset=True,init_data=True)
load_list = "wilaya,daira,commune,street_furniture,media_type,print_type,face_type,lesser,convention,site,"
load_list += "convention_installment,convention_return,media,face"
load_list = load_list.split(",")
for to_load in load_list :
dbcnx.load_data(to_load,mapping)
StreetFurniture.get_by(name="PANNEAUX").remove()
print "The self checking shall begin. Would you like to skip this operation [y/n] ?"
if sys.stdin.readline().strip() in "y,Y".split(","):
return
print "This is your last chance. Press any key to start or ctrl-c to abort (data has already been commited, don't worry about that)."
sys.stdin.readline()
for klass in Lesser, Convention, ConventionInstallment, Site, Media:
print "self cheking the elements of",klass.__name__
for instance in klass.query.all():
print "cheking",repr(instance)
instance.self_check()
def load_one(data):
dbcnx.init()
dbcnx.load_data(data,mapping)
if len(sys.argv) > 1:
load_one(sys.argv[1])
else:
load_all()
dbcnx.commit()
# Coriolis
from coriolis.client.utils.config import AppConfig
from coriolis import models
from coriolis.models import *
from coriolis.client.utils import Singleton,get_csv,get_logger, db_available
from coriolis.client.core.database import BaseDB
# ORM
import elixir
# Python
from csv import reader as csvreader
class DatabaseConnection(BaseDB,object):
__metaclass__ = Singleton
session = elixir.session
def __init__(self,config=None,env="test",reset=False,init_data=False):
"""
env should be one of : default/prod, test, user
This will match self.config.default_conf, self.config.user_conf, self.config.test_conf etc.
"""
self.config = config
if not config :
app_config = AppConfig()
self.config = getattr(app_config,env+"_conf")
elixir.metadata.bind = self.config.backend.database.url
self.session.bind = elixir.metadata.bind
self.initialized = False
self.mapped = False # boolean
print "working on ",elixir.metadata.bind
## This breaks coriolis at startup if the databases are not available
## IT SHOULD NOT HAPPEN BEFORE THE FIRST VIEW IS CALLED!!!!
## AND IT SHOULD GIVE FEEDBACK (using utils.alerts.Alerts methods)
## See coriolis/client/core/gtkui/ui.py:L166
# if reset:
# self.reset(init_data)
# self.map_models(True)
# self.commit()
def init(self,reset=False,init_data=False):
if reset:
self.reset(init_data)
self.map_models(True)
self.commit()
def reset(self,init_data=False):
self.empty()
if init_data:
self.init_data()
@db_available
def map_models(self,create=False):
if self.mapped :
return
# When set to True, the create parameter creates tables if they do not exist
# OBJECTION: creating tables on the fly is NOT Coriolis's job, it's the developer's.
# Move this code to test
elixir.setup_all(create)
self.mapped = True
print "map_models ok."
return True
def get_last_error_message(self):
"""
Methods decorated with the db_available will store the last_error_message variable as an attribute,
so that it can be accessed later. See a usage example in DatabaseConneciton::load_from_csv
"""
# This is set by the db_available decorator if an error is raised.
return self.last_error_message
@db_available
def commit(self):
#~ print "DIRTY OBJECTS BEFORE COMMIT ARE",self.session.dirty
self.session.commit()
if hasattr(self, "manager"):
self.manager.frame.toggle_save(False)
self.manager.frame.toggle_check(True)
# Leave this alone, it resolves bug 159
#return "OK"
# Leave result to None, it resolves bug 159
def flush(self, result=None):
print "FLUSHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH"
result = self.do_flush(result)
if result == "integrity_error":
self.rollback()
return result
if hasattr(self, "manager"):
self.manager.frame.toggle_save(True)
return result
@db_available
def do_flush(self,result):
"""
"""
self.session.flush()
return result
def fallback(self, *args, **kwargs):
# (yassine) can't understand ancient vietnamese ...
# only fallback is we are getting data, NOT editing
elixir.metadata.bind = self.config.backend.database.fallback_url
#~ print "falling back to ",elixir.metadata.bind
# should we rerun a map_model here?
@db_available
def rollback(self):
self.session.rollback()
if hasattr(self, "manager"):
self.provide_refresh()
self.manager.frame.toggle_save(False)
# leave this alone, it resolves bug 159
#return "OK"
def provide_refresh(self):
"""
This method should flush the database and
tell the active view to refill itself; obviously.
"""
self.flush()
d = self.manager.active_component.active_mscontroller.top_controller.provide_data()
return d
def empty(self):
"""
This is useful for tests when you insert unique keys multiple times.
"""
if not self.mapped :
self.map_models()
elixir.drop_all()
# Tell people that the database has gone
self.mapped = False
def init_data(self):
"""
Populates some constant tables in the database like *_type tables, sanity etc.
"""
# Did we empty the database just before ?
if not self.mapped :
# Re-create the database
self.map_models(True)
# import a long list of objects
import initdata
self.commit()
def add(self,obj):
# FIXME:
# is there a difference between self.session and elixir.session?
# if not, why the jumping around? making the code confusing.
elixir.session.add(obj)
def get_session(self):
return self.session
def initialize(self):
"""
"""
self.initialized = True
def load_data(self,name,mapping):
"""
"""
parts = name.split("_")
csv_file = get_csv(name)
class_name = "".join([part.capitalize() for part in parts])
print "name",name
print "class_name",class_name
klass = getattr(models,class_name)
fd = file("integrity.log","a")
fd.write("Report for %s" % csv_file)
fd.write("*"*72)
self.load_from_csv(csv_file,klass,mapping,force_id=True)
def load_from_csv(self,filename,model_class,colnames_mapping={},force_id=True):
"""
"""
fd = file("integrity.log","a")
#logger = get_logger("error.log")
data = csvreader(file(filename))
header = data.next()
model_class.force_id = force_id
tablename = model_class.mapper.mapped_table.name
sequence_name = tablename+"_id_seq"
last_id = 0
for line in data:
data_dict = self.fix_data(to_dict(header,line,colnames_mapping))
id_int = int(data_dict.get("id"))
if last_id < id_int:
last_id = id_int
inst = model_class(**data_dict)
error = self.commit()
if error == "integrity_error":
error_msg = self.get_last_error_message()
fd.write("id: %s"%id_int)
# logger.error("error : (%s)" % error_msg)
# logger.error("id : (%s)" % id_int)
# logger.error("file : (%s)" % filename)
# # logger.error("line : (%s)" % line)
# # logger.error("data : (%s)" % data_dict)
self.rollback()
# try:
# # flush dosen't really commit, and a rollback will remove all prior VALID instances too
# # so we prefer to commit to keep those valid instances
# self.commit()
# except Exception,e:
# print "Exception ???",e
# logger.error(str(e))
# logger.error("file : (%s), class : (%s), id (%s)" % (filename,model_class,data_dict.get("id")))
# logger.error("data : (%s)" % data_dict)
# self.rollback()
last_id +=1 # for safety
print "restarting", sequence_name, "at", last_id
#fix_seq = DDL('ALTER SEQUENCE {0} RESTART WITH {1}'.format(sequence_name, last_id), bind=elixir.metadata.bind)
fix_seq = 'ALTER SEQUENCE {0} RESTART WITH {1}'.format(sequence_name, last_id)
self.session.execute(fix_seq)
def fix_data(self,data_dict):
"""
"""
self.fix_agency(data_dict)
self.fix_user(data_dict)
return data_dict
def fix_agency(self,data_dict):
"""
"""
agency = data_dict.get("agency")
if agency in (None,"") :
data_dict["agency"] = "alger"
def fix_user(self,data_dict):
"""
"""
user = data_dict.get("user")
if user in (None,"") :
data_dict["user"] = "coriolis"
def to_dict(header,line,colnames_mapping={}):
"""
Helper function used by DatabaseConection::load_from_csv
"""
return dict([(colnames_mapping.get(colname,colname),value != "\N" and value or None)
for (colname, value) in zip(header,line)])