Xavier (Open ERP) has proposed merging
lp:~openerp-dev/openobject-server/trunk-import-fixes-xmo into
lp:openobject-server.
Requested reviews:
Aline (OpenERP) (apr-tinyerp)
OpenERP Core Team (openerp)
Related bugs:
Bug #898547 in OpenERP Server: "import data, Required Fields are not selected
: name"
https://bugs.launchpad.net/openobject-server/+bug/898547
For more details, see:
https://code.launchpad.net/~openerp-dev/openobject-server/trunk-import-fixes-xmo/+merge/84447
Fixes to import_data:
* Added some tests to check behavior
* Improved doc and naming
* Fixed handling of non-id o2m subfields not preceded by an id-type field for
the same o2m (would get treated as "names" and blow up if "name_search" call
failed)
--
https://code.launchpad.net/~openerp-dev/openobject-server/trunk-import-fixes-xmo/+merge/84447
Your team OpenERP R&D Team is subscribed to branch
lp:~openerp-dev/openobject-server/trunk-import-fixes-xmo.
=== modified file 'openerp/osv/orm.py'
--- openerp/osv/orm.py 2011-12-01 12:15:03 +0000
+++ openerp/osv/orm.py 2011-12-05 09:10:26 +0000
@@ -1172,7 +1172,7 @@
datas += self.__export_row(cr, uid, row, fields_to_export, context)
return {'datas': datas}
- def import_data(self, cr, uid, fields, datas, mode='init', current_module='', noupdate=False, context=None, filename=None):
+ def import_data(self, cr, uid, fields, data, mode='init', current_module='', noupdate=False, context=None, filename=None):
"""Import given data in given module
This method is used when importing data via client menu.
@@ -1240,26 +1240,40 @@
id = ids[0][0]
return id
- # IN:
- # datas: a list of records, each record is defined by a list of values
- # prefix: a list of prefix fields ['line_ids']
- # position: the line to process, skip is False if it's the first line of the current record
- # OUT:
- # (res, position, warning, res_id) with
- # res: the record for the next line to process (including it's one2many)
- # position: the new position for the next line
- # res_id: the ID of the record if it's a modification
- def process_liness(self, datas, prefix, current_module, model_name, fields_def, position=0, skip=0):
- line = datas[position]
- row = {}
+ def process_liness(self, rows, prefix, current_module, model_name, fields_def, position=0, skip=0):
+ """ Processes a row of data to transform it into a record
+ importable into OpenERP proper (via (ir.model.data)._update)
+
+ :param rows: record data to import
+ :type rows: list[list[str]]
+ :param list[str] prefix: list of prefix fields
+ :param str current_module:
+ :param str model_name:
+ :param dict[dict] fields_def: fields definition for the current model
+ :param int position: index of the current row to import
+ :param int skip:
+ :returns: 5-tuple of import information continuation:
+ #. extracted record for db import
+ #. the position of the next line to import
+ #. warning messages generated while importing the row
+ #. identifier of the record being imported, for update
+ #. xmlid of the record being imported, for update?
+ :rtype: (dict, int, list[str], int, str)
+
+ .. note::
+ specifying a name_get (no sub-field) *and* a /.id *and* an
+ a /id (toplevel or subfield e.g. o2m) is undefined behavior if
+ they conflict
+ """
+ line = rows[position]
+ record = {}
warning = []
data_res_id = False
xml_id = False
- nbrmax = position+1
+ next_row = position+1
done = {}
for i, field in enumerate(fields):
- res = False
if i >= len(line):
raise Exception(_('Please check that all your lines have %d columns.'
'Stopped around line %d having %d columns.') % \
@@ -1271,6 +1285,15 @@
if line[i] and skip:
return False
continue
+
+ if field == prefix:
+ # o2m field by name in similarly named column
+ try:
+ data_res_id = _get_id(model_name, line[i], current_module, mode=None)
+ except ValueError:
+ pass
+ continue
+
field_name = field[len(prefix)]
#set the mode for m2o, o2m, m2m : xml_id/id/name
@@ -1311,25 +1334,27 @@
newfd = relation_obj.fields_get( cr, uid, context=context )
pos = position
- res = many_ids(line[i], relation, current_module, mode)
+ res = [] # many_ids(line[i], relation, current_module, mode)
first = 0
- while pos < len(datas):
- res2 = process_liness(self, datas, prefix + [field_name], current_module, relation_obj._name, newfd, pos, first)
+ while pos < len(rows):
+ res2 = process_liness(self, rows, prefix + [field_name], current_module, relation_obj._name, newfd, pos, first)
if not res2:
break
- (newrow, pos, w2, data_res_id2, xml_id2) = res2
- nbrmax = max(nbrmax, pos)
- warning += w2
+ (o2m_record, pos, additional_warnings, o2m_id, o2m_external_id) = res2
+ next_row = max(next_row, pos)
+ warning += additional_warnings
first += 1
- if data_res_id2:
- res.append((4, data_res_id2))
+ if o2m_id:
+ res.append((4, o2m_id))
- if (not newrow) or not reduce(lambda x, y: x or y, newrow.values(), 0):
+ # Found no values to write to linked o2m, bail out
+ if not (o2m_record and any(o2m_record.itervalues())):
break
- res.append( (data_res_id2 and 1 or 0, data_res_id2 or 0, newrow) )
+ # Create o2m if no matching o2m record, else update it
+ res.append((1 if o2m_id else 0, o2m_id or False, o2m_record) )
elif field_type == 'many2one':
relation = fields_def[field_name]['relation']
@@ -1346,23 +1371,22 @@
elif field_type == 'float':
res = line[i] and float(line[i]) or 0.0
elif field_type == 'selection':
+ res = False
for key, val in fields_def[field_name]['selection']:
if tools.ustr(line[i]) in [tools.ustr(key), tools.ustr(val)]:
res = key
break
- if line[i] and not res:
+ if not res:
+ message = _("Key/value '%s' not found in selection field '%s'")
logging.getLogger('orm.import').warn(
- _("key '%s' not found in selection field '%s'"),
- tools.ustr(line[i]), tools.ustr(field_name))
- warning.append(_("Key/value '%s' not found in selection field '%s'") % (
- tools.ustr(line[i]), tools.ustr(field_name)))
-
+ message, tools.ustr(line[i]), tools.ustr(field_name))
+ warning.append(message % (tools.ustr(line[i]), tools.ustr(field_name)))
else:
res = line[i]
- row[field_name] = res or False
+ record[field_name] = res or False
- return row, nbrmax, warning, data_res_id, xml_id
+ return record, next_row, warning, data_res_id, xml_id
fields_def = self.fields_get(cr, uid, context=context)
@@ -1372,19 +1396,19 @@
data = pickle.load(partial_import_file)
position = data.get(filename, 0)
- while position<len(datas):
+ while position < len(data):
(res, position, warning, res_id, xml_id) = \
- process_liness(self, datas, [], current_module, self._name, fields_def, position=position)
- if len(warning):
+ process_liness(self, data, [], current_module, self._name, fields_def, position=position)
+ if warning:
cr.rollback()
- return -1, res, 'Line ' + str(position) +' : ' + '!\n'.join(warning), ''
+ return -1, res, _("Row %d: %s") % (position, '\n'.join(warning)), ''
try:
ir_model_data_obj._update(cr, uid, self._name,
current_module, res, mode=mode, xml_id=xml_id,
noupdate=noupdate, res_id=res_id, context=context)
except Exception, e:
- return -1, res, 'Line ' + str(position) + ' : ' + tools.ustr(e), ''
+ return -1, res, _("Row %d: %s") % (position, tools.ustr(e)), ''
if config.get('import_partial') and filename and (not (position%100)):
with open(config.get('import_partial'), 'rb') as partial_import:
=== added file 'openerp/tests/test_import.py'
--- openerp/tests/test_import.py 1970-01-01 00:00:00 +0000
+++ openerp/tests/test_import.py 2011-12-05 09:10:26 +0000
@@ -0,0 +1,207 @@
+# -*- coding: utf-8 -*-
+import collections
+import unittest2
+
+from openerp.osv import orm, fields
+
+class Myth(orm.Model):
+ _name = 'myth'
+ _columns = {
+ 'foo': fields.char('Foo', 42),
+ 'bar': fields.char('Bar', 42)
+ }
+
+class RelatedMyth(orm.Model):
+ _name = 'myth.related'
+ _columns = {
+ 'name': fields.char('Name', 0)
+ }
+ def __init__(self, *args):
+ super(RelatedMyth, self).__init__(*args)
+ self._records = {}
+ self._names = {}
+ def name_search(self, cr, user, name='', *args, **kwargs):
+ id = self._names.get(name)
+ if id:
+ return [(id, name)]
+ return []
+ def search(self, cr, uid, predicate, *args, **kwargs):
+ pred, = predicate
+ field, op, id = pred
+ r = self._records.get(id)
+ if r:
+ return [r]
+ return []
+class RelatingMyth(orm.Model):
+ _name = 'myth.relating'
+ _columns = {
+ 'name': fields.char('name', 0),
+ 'children': fields.one2many('myth.related', 'parent', 'Children')
+ }
+ def _name_search(self, *args, **kwargs):
+ return []
+
+class FakeModel(object):
+ _inherits = []
+ def check(self, *args):
+ return True
+ def _get_source(self, *args):
+ return None
+# should probably be a mock.Mock instance
+ModelRecord = collections.namedtuple('ModelRecord', 'model record xmlid dbid')
+class FakeIrModelData(FakeModel):
+ def record(self):
+ self.records = []
+ def _update(self, cr, uid, model, module, values,
+ xml_id=False, store=True, noupdate=False, mode='init',
+ res_id=False, context=None):
+ self.records.append(
+ ModelRecord(model=model, record=values, xmlid=xml_id, dbid=res_id))
+
+class FakePool(object):
+ _store_function = {}
+ models = collections.defaultdict(FakeModel)
+ def __init__(self):
+ self.models['ir.model.data'] = FakeIrModelData()
+ def add(self, name, item):
+ self.models[name] = item
+ def get(self, name):
+ return self.models[name]
+class FakeCursor(object):
+ def execute(self, stmt, params):
+ pass
+ def fetchone(self):
+ pass
+
+class TestBasicImport(unittest2.TestCase):
+ def setUp(self):
+ self.pool = FakePool()
+ self.pool.get('ir.model.data').record()
+ self.cr = FakeCursor()
+ self.instance = Myth.create_instance(self.pool, self.cr)
+
+ def test_import_nothing(self):
+ self.assertEqual(
+ (0, 0, 0, 0),
+ self.instance.import_data(self.cr, 'uid', ['foo', 'bar'], []))
+ def test_import_single(self):
+ self.assertEqual(
+ (1, 0, 0, 0),
+ self.instance.import_data(
+ self.cr, 'uid', ['foo', 'bar'], [['foo', 'bar']]))
+ self.assertEqual(
+ [('myth', {'foo': 'foo', 'bar': 'bar'}, False, False)],
+ self.pool.get('ir.model.data').records)
+ def test_import_two(self):
+ self.assertEqual(
+ (2, 0, 0, 0),
+ self.instance.import_data(
+ self.cr, 'uid', ['foo', 'bar'], [
+ ['foo', 'bar'],
+ ['qux', 'quux']]))
+ self.assertEqual(
+ [('myth', {'foo': 'foo', 'bar': 'bar'}, False, False),
+ ('myth', {'foo': 'qux', 'bar': 'quux'}, False, False)],
+ self.pool.get('ir.model.data').records)
+
+class TestO2MImportEmpty(unittest2.TestCase):
+ def setUp(self):
+ self.pool = FakePool()
+ self.pool.get('ir.model.data').record()
+ self.cr = FakeCursor()
+ self.instance = RelatingMyth.create_instance(self.pool, self.cr)
+ RelatedMyth.create_instance(self.pool, self.cr)
+
+ def test_import_one(self):
+ self.assertEqual(
+ (1, 0, 0, 0),
+ self.instance.import_data(
+ self.cr, 'uid', ['name', 'children/name'],
+ [['parent', 'child']]))
+ self.assertEqual(
+ [('myth.relating',
+ {'name': 'parent', 'children': [(0, 0, {'name': 'child'})]},
+ False, False)],
+ self.pool.get('ir.model.data').records)
+
+ def test_import_multiple_children(self):
+ self.assertEqual(
+ (3, 0, 0, 0),
+ self.instance.import_data(
+ self.cr, 'uid', ['name', 'children/name'],
+ [['parent', 'child'],
+ ['', 'child2'],
+ ['', 'child3']]))
+ self.assertEqual(
+ [('myth.relating',
+ {'name': 'parent', 'children': [
+ (0, 0, {'name': 'child'}),
+ (0, 0, {'name': 'child2'}),
+ (0, 0, {'name': 'child3'})]},
+ False, False)],
+ self.pool.get('ir.model.data').records)
+
+ def test_import_multiple_parent(self):
+ self.assertEqual(
+ (3, 0, 0, 0),
+ self.instance.import_data(
+ self.cr, 'uid', ['name', 'children/name'],
+ [['parent', 'child'],
+ ['parent2', 'child2'],
+ ['parent3', 'child3']]))
+ self.assertEqual(
+ [('myth.relating', {'name': 'parent', 'children': [(0, 0, {'name': 'child'})]}, False, False),
+ ('myth.relating', {'name': 'parent2', 'children': [(0, 0, {'name': 'child2'})]}, False, False),
+ ('myth.relating', {'name': 'parent3', 'children': [(0, 0, {'name': 'child3'})]}, False, False)
+ ],
+ self.pool.get('ir.model.data').records)
+
+ def test_import_multiple_everything(self):
+ self.assertEqual(
+ (4, 0, 0, 0),
+ self.instance.import_data(
+ self.cr, 'uid', ['name', 'children/name'],
+ [['parent', 'child'],
+ ['', 'child2'],
+ ['parent2', 'child3'],
+ ['', 'child4']]))
+ self.assertEqual(
+ [('myth.relating', {'name': 'parent', 'children': [
+ (0, 0, {'name': 'child'}), (0, 0, {'name': 'child2'})]}, False, False),
+ ('myth.relating', {'name': 'parent2', 'children': [
+ (0, 0, {'name': 'child3'}), (0, 0, {'name': 'child4'})]}, False, False)],
+ self.pool.get('ir.model.data').records)
+
+class TestO2MImportExist(unittest2.TestCase):
+ def setUp(self):
+ self.pool = FakePool()
+ self.pool.get('ir.model.data').record()
+ self.cr = FakeCursor()
+ self.instance = RelatingMyth.create_instance(self.pool, self.cr)
+ self.o2m = RelatedMyth.create_instance(self.pool, self.cr)
+
+ def test_db_id(self):
+ self.o2m._records[42] = {'name': 'bob'}
+ self.assertEqual(
+ (1, 0, 0, 0),
+ self.instance.import_data(
+ self.cr, 'uid', ['name', 'children/name', 'children/.id'],
+ [['parent', 'child', '42']]))
+ self.assertEqual(
+ [('myth.relating',
+ {'name': 'parent', 'children': [(4, 42), (1, 42, {'name': 'child'})]},
+ False, False)],
+ self.pool.get('ir.model.data').records)
+
+ def test_db_name(self):
+ self.o2m._names['ed'] = 42
+ self.assertEqual(
+ (1, 0, 0, 0),
+ self.instance.import_data(
+ self.cr, 'uid', ['name', 'children/name', 'children'],
+ [['parent', 'child', 'ed']]))
+ self.assertEqual(
+ [('myth.relating',
+ {'name': 'parent', 'children': [(4, 42), (1, 42, {'name': 'child'})]},
+ False, False)],
+ self.pool.get('ir.model.data').records)
_______________________________________________
Mailing list: https://launchpad.net/~openerp-dev-gtk
Post to : [email protected]
Unsubscribe : https://launchpad.net/~openerp-dev-gtk
More help : https://help.launchpad.net/ListHelp