Xavier (Open ERP) has proposed merging
lp:~openerp-dev/openobject-addons/trunk-newimport-xmo into lp:openobject-addons.
Requested reviews:
OpenERP Core Team (openerp)
For more details, see:
https://code.launchpad.net/~openerp-dev/openobject-addons/trunk-newimport-xmo/+merge/119562
Cleaner import client API.
* Simpler import flow for client
* Server-specified filtering of importable elements (&al)
* Server-specified auto-matching of fields to columns
* Native openerp objects, may allow for building standard wizards in the long
run, easier to extend, stat & al (e.g. export rights can be handled by
filtering create/write access to base_import.import)
--
https://code.launchpad.net/~openerp-dev/openobject-addons/trunk-newimport-xmo/+merge/119562
Your team OpenERP R&D Team is subscribed to branch
lp:~openerp-dev/openobject-addons/trunk-newimport-xmo.
=== added directory 'base_import'
=== added file 'base_import/__init__.py'
--- base_import/__init__.py 1970-01-01 00:00:00 +0000
+++ base_import/__init__.py 2012-08-14 14:54:31 +0000
@@ -0,0 +1,3 @@
+import controllers
+import models
+import tests.models
=== added file 'base_import/__openerp__.py'
--- base_import/__openerp__.py 1970-01-01 00:00:00 +0000
+++ base_import/__openerp__.py 2012-08-14 14:54:31 +0000
@@ -0,0 +1,33 @@
+{
+ 'name': 'Base import',
+ 'description': """
+New extensible file import for OpenERP
+======================================
+
+Re-implement openerp's file import system:
+
+* Server side, the previous system forces most of the logic into the
+ client which duplicates the effort (between clients), makes the
+ import system much harder to use without a client (direct RPC or
+ other forms of automation) and makes knowledge about the
+ import/export system much harder to gather as it is spread over
+ 3+ different projects.
+
+* In a more extensible manner, so users and partners can build their
+ own front-end to import from other file formats (e.g. OpenDocument
+ files) which may be simpler to handle in their work flow or from
+ their data production sources.
+
+* In a module, so that administrators and users of OpenERP who do not
+ need or want an online import can avoid it being available to users.
+""",
+ 'category': 'Uncategorized',
+ 'website': 'http://www.openerp.com',
+ 'author': 'OpenERP SA',
+ 'depends': ['base'],
+ 'installable': True,
+ 'auto_install': False, # set to true and allow uninstall?
+ 'css': ['static/src/css/import.css'],
+ 'js': ['static/src/js/import.js'],
+ 'qweb': ['static/src/xml/import.xml'],
+}
=== added file 'base_import/controllers.py'
--- base_import/controllers.py 1970-01-01 00:00:00 +0000
+++ base_import/controllers.py 2012-08-14 14:54:31 +0000
@@ -0,0 +1,24 @@
+# -*- coding: utf-8 -*-
+import base64
+import simplejson
+
+try:
+ import openerp.addons.web.common.http as openerpweb
+except ImportError:
+ import web.common.http as openerpweb
+
+class ImportController(openerpweb.Controller):
+ _cp_path = '/base_import'
+
+ @openerpweb.httprequest
+ def set_file(self, req, file, import_id, jsonp='callback'):
+ import_id = int(import_id)
+
+ written = req.session.model('base_import.import').write(import_id, {
+ 'file': base64.b64encode(file.read()),
+ 'file_name': file.filename,
+ 'file_type': file.content_type,
+ }, req.session.eval_context(req.context))
+
+ return 'window.top.%s(%s)' % (
+ jsonp, simplejson.dumps({'result': written}))
=== added file 'base_import/models.py'
--- base_import/models.py 1970-01-01 00:00:00 +0000
+++ base_import/models.py 2012-08-14 14:54:31 +0000
@@ -0,0 +1,352 @@
+import base64
+import codecs
+import csv
+import itertools
+import logging
+import operator
+import random
+
+try:
+ from cStringIO import StringIO
+except ImportError:
+ from StringIO import StringIO
+
+import psycopg2
+
+from openerp.osv import orm, fields
+from openerp.tools.translate import _
+
+FIELDS_RECURSION_LIMIT = 2
+ERROR_PREVIEW_BYTES = 200
+_logger = logging.getLogger(__name__)
+class ir_import(orm.TransientModel):
+ _name = 'base_import.import'
+
+ _columns = {
+ 'res_model': fields.char('Model', size=64),
+ 'file': fields.binary('File'),
+ 'file_name': fields.char('File Name', size=None),
+ 'file_mime': fields.char('File Type', size=None),
+ }
+
+ def get_fields(self, cr, uid, model, context=None,
+ depth=FIELDS_RECURSION_LIMIT):
+ """ Recursively get fields for the provided model (through
+ fields_get) and filter them according to importability
+
+ The output format is a list of ``Field``, with ``Field``
+ defined as:
+
+ .. class:: Field
+
+ .. attribute:: id (str)
+
+ A non-unique identifier for the field, used to compute
+ the span of the ``required`` attribute: if multiple
+ ``required`` fields have the same id, only one of them
+ is necessary.
+
+ .. attribute:: name (str)
+
+ The field's logical (OpenERP) name within the scope of
+ its parent.
+
+ .. attribute:: string (str)
+
+ The field's human-readable name (``@string``)
+
+ .. attribute:: required (bool)
+
+ Whether the field is marked as required in the
+ model. Clients must provide non-empty import values
+ for all required fields or the import will error out.
+
+ .. attribute:: fields (list(Field))
+
+ The current field's subfields. The database and
+ external identifiers for m2o and m2m fields; a
+ filtered and transformed fields_get for o2m fields (to
+ a variable depth defined by ``depth``).
+
+ Fields with no sub-fields will have an empty list of
+ sub-fields.
+
+ :param str model: name of the model to get fields form
+ :param int landing: depth of recursion into o2m fields
+ """
+ fields = [{
+ 'id': 'id',
+ 'name': 'id',
+ 'string': _("External ID"),
+ 'required': False,
+ 'fields': [],
+ }]
+ fields_got = self.pool[model].fields_get(cr, uid, context=context)
+ for name, field in fields_got.iteritems():
+ if field.get('readonly'):
+ states = field.get('states')
+ if not states:
+ continue
+ # states = {state: [(attr, value), (attr2, value2)], state2:...}
+ if not any(attr == 'readonly' and value is False
+ for attr, value in itertools.chain.from_iterable(
+ states.itervalues())):
+ continue
+
+ f = {
+ 'id': name,
+ 'name': name,
+ 'string': field['string'],
+ # Y U NO ALWAYS HAVE REQUIRED
+ 'required': bool(field.get('required')),
+ 'fields': [],
+ }
+
+ if field['type'] in ('many2many', 'many2one'):
+ f['fields'] = [
+ dict(f, name='id', string=_("External ID")),
+ dict(f, name='.id', string=_("Database ID")),
+ ]
+ elif field['type'] == 'one2many' and depth:
+ f['fields'] = self.get_fields(
+ cr, uid, field['relation'], context=context, depth=depth-1)
+
+ fields.append(f)
+
+ # TODO: cache on model?
+ return fields
+
+ def _read_csv(self, record, options):
+ """ Returns a CSV-parsed iterator of all empty lines in the file
+
+ :throws csv.Error: if an error is detected during CSV parsing
+ :throws UnicodeDecodeError: if ``options.encoding`` is incorrect
+ """
+ csv_iterator = csv.reader(
+ StringIO(base64.b64decode(record.file)),
+ quotechar=options['quote'],
+ delimiter=options['separator'])
+ csv_nonempty = itertools.ifilter(None, csv_iterator)
+ # TODO: guess encoding?
+ encoding = options.get('encoding', 'utf-8')
+ return itertools.imap(
+ lambda row: [item.decode(encoding) for item in row],
+ csv_nonempty)
+
+ def _match_header(self, header, fields, options):
+ """ Attempts to match a given header to a field of the
+ imported model.
+
+ :param str header: header name from the CSV file
+ :param fields:
+ :param dict options:
+ :returns: an empty list if the header couldn't be matched, or
+ all the fields to traverse
+ :rtype: list(Field)
+ """
+ for field in fields:
+ # FIXME: should match all translations & original
+ # TODO: use string distance (levenshtein? hamming?)
+ if header == field['name'] \
+ or header.lower() == field['string'].lower():
+ return [field]
+
+ if '/' not in header:
+ return []
+
+ # relational field path
+ traversal = []
+ subfields = fields
+ # Iteratively dive into fields tree
+ for section in header.split('/'):
+ # Strip section in case spaces are added around '/' for
+ # readability of paths
+ match = self._match_header(section.strip(), subfields, options)
+ # Any match failure, exit
+ if not match: return []
+ # prep subfields for next iteration within match[0]
+ field = match[0]
+ subfields = field['fields']
+ traversal.append(field)
+ return traversal
+
+ def _match_headers(self, rows, fields, options):
+ """ Attempts to match the imported model's fields to the
+ titles of the parsed CSV file, if the file is supposed to have
+ headers.
+
+ Will consume the first line of the ``rows`` iterator.
+
+ Returns a pair of (None, None) if headers were not requested
+ or the list of headers and a dict mapping cell indices
+ to key paths in the ``fields`` tree
+
+ :param Iterator rows:
+ :param dict fields:
+ :param dict options:
+ :rtype: (None, None) | (list(str), dict(int: list(str)))
+ """
+ if not options.get('headers'):
+ return None, None
+
+ headers = next(rows)
+ return headers, dict(
+ (index, [field['name'] for field in self._match_header(header, fields, options)] or None)
+ for index, header in enumerate(headers)
+ )
+
+ def parse_preview(self, cr, uid, id, options, count=10, context=None):
+ """ Generates a preview of the uploaded files, and performs
+ fields-matching between the import's file data and the model's
+ columns.
+
+ If the headers are not requested (not options.headers),
+ ``matches`` and ``headers`` are both ``False``.
+
+ :param id: identifier of the import
+ :param int count: number of preview lines to generate
+ :param options: format-specific options.
+ CSV: {encoding, quote, separator, headers}
+ :type options: {str, str, str, bool}
+ :returns: {fields, matches, headers, preview} | {error, preview}
+ :rtype: {dict(str: dict(...)), dict(int, list(str)), list(str), list(list(str))} | {str, str}
+ """
+ (record,) = self.browse(cr, uid, [id], context=context)
+ fields = self.get_fields(cr, uid, record.res_model, context=context)
+
+ try:
+ rows = self._read_csv(record, options)
+
+ headers, matches = self._match_headers(rows, fields, options)
+ # Match should have consumed the first row (iif headers), get
+ # the ``count`` next rows for preview
+ preview = itertools.islice(rows, count)
+ return {
+ 'fields': fields,
+ 'matches': matches or False,
+ 'headers': headers or False,
+ 'preview': list(preview),
+ }
+ except (csv.Error, TypeError, UnicodeDecodeError), e:
+ # Due to lazy generators, UnicodeDecodeError (for
+ # instance) may only be raised when serializing the
+ # preview to a list in the return.
+ _logger.debug("Error during CSV parsing preview", exc_info=True)
+ return {
+ 'error': str(e),
+ # iso-8859-1 ensures decoding will always succeed,
+ # even if it yields non-printable characters. This is
+ # in case of UnicodeDecodeError (or csv.Error
+ # compounded with UnicodeDecodeError)
+ 'preview': base64.b64decode(record.file)[:ERROR_PREVIEW_BYTES]\
+ .decode('iso-8859-1'),
+ }
+
+ def _convert_import_data(self, record, fields, options, context=None):
+ """ Extracts the input browse_record and fields list (with
+ ``False``-y placeholders for fields to *not* import) into a
+ format Model.import_data can use: a fields list without holes
+ and the precisely matching data matrix
+
+ :param browse_record record:
+ :param list(str|bool): fields
+ :returns: (data, fields)
+ :rtype: (list(list(str)), list(str))
+ :raises ValueError: in case the import data could not be converted
+ """
+ # Get indices for non-empty fields
+ indices = [index for index, field in enumerate(fields) if field]
+ if not indices:
+ raise ValueError(_("You must configure at least one field to import"))
+ # If only one index, itemgetter will return an atom rather
+ # than a 1-tuple
+ if len(indices) == 1: mapper = lambda row: [row[indices[0]]]
+ else: mapper = operator.itemgetter(*indices)
+ # Get only list of actually imported fields
+ import_fields = filter(None, fields)
+
+ rows_to_import = self._read_csv(record, options)
+ if options.get('headers'):
+ rows_to_import = itertools.islice(
+ rows_to_import, 1, None)
+ data = [
+ row for row in itertools.imap(mapper, rows_to_import)
+ # don't try inserting completely empty rows (e.g. from
+ # filtering out o2m fields)
+ if any(row)
+ ]
+
+ return data, import_fields
+
+ def do(self, cr, uid, id, fields, options, dryrun=False, context=None):
+ """ Actual execution of the import
+
+ :param fields: import mapping: maps each column to a field,
+ ``False`` for the columns to ignore
+ :type fields: list(str|bool)
+ :param dict options:
+ :param bool dryrun: performs all import operations (and
+ validations) but rollbacks writes, allows
+ getting as much errors as possible without
+ the risk of clobbering the database.
+ :returns: A list of errors. If the list is empty the import
+ executed fully and correctly. If the list is
+ non-empty it contains dicts with 3 keys ``type`` the
+ type of error (``error|warning``); ``message`` the
+ error message associated with the error (a string)
+ and ``record`` the data which failed to import (or
+ ``false`` if that data isn't available or provided)
+ :rtype: list({type, message, record})
+ """
+ cr.execute('SAVEPOINT import')
+
+ (record,) = self.browse(cr, uid, [id], context=context)
+ try:
+ data, import_fields = self._convert_import_data(
+ record, fields, options, context=context)
+ except ValueError, e:
+ return [{
+ 'type': 'error',
+ 'message': str(e),
+ 'record': False,
+ }]
+
+ try:
+ _logger.info('importing %d rows...', len(data))
+ (code, record, message, _wat) = self.pool[record.res_model].import_data(
+ cr, uid, import_fields, data, context=context)
+ _logger.info('done')
+
+ except Exception, e:
+ _logger.exception("Import failed")
+ # TODO: remove when exceptions stop being an "expected"
+ # behavior of import_data on some (most) invalid
+ # input.
+ code, record, message = -1, None, str(e)
+
+ # If transaction aborted, RELEASE SAVEPOINT is going to raise
+ # an InternalError (ROLLBACK should work, maybe). Ignore that.
+ # TODO: to handle multiple errors, create savepoint around
+ # write and release it in case of write error (after
+ # adding error to errors array) => can keep on trying to
+ # import stuff, and rollback at the end if there is any
+ # error in the results.
+ try:
+ if dryrun:
+ cr.execute('ROLLBACK TO SAVEPOINT import')
+ else:
+ cr.execute('RELEASE SAVEPOINT import')
+ except psycopg2.InternalError:
+ pass
+
+ if code != -1:
+ return []
+
+ # TODO: add key for error location?
+ # TODO: error not within normal preview, how to display? Re-preview
+ # with higher ``count``?
+ return [{
+ 'type': 'error',
+ 'message': message,
+ 'record': record or False
+ }]
=== added directory 'base_import/static'
=== added directory 'base_import/static/src'
=== added directory 'base_import/static/src/js'
=== added file 'base_import/static/src/js/import.js'
--- base_import/static/src/js/import.js 1970-01-01 00:00:00 +0000
+++ base_import/static/src/js/import.js 2012-08-14 14:54:31 +0000
@@ -0,0 +1,134 @@
+openerp.base_import = function (instance) {
+ var QWeb = instance.web.qweb;
+ var _t = instance.web._t;
+ var _lt = instance.web._lt;
+
+ /**
+ * Safari does not deal well at all with raw JSON data being
+ * returned. As a result, we're going to cheat by using a
+ * pseudo-jsonp: instead of getting JSON data in the iframe, we're
+ * getting a ``script`` tag which consists of a function call and
+ * the returned data (the json dump).
+ *
+ * The function is an auto-generated name bound to ``window``,
+ * which calls back into the callback provided here.
+ *
+ * @param {Object} form the form element (DOM or jQuery) to use in the call
+ * @param {Object} attributes jquery.form attributes object
+ * @param {Function} callback function to call with the returned data
+ */
+ function jsonp(form, attributes, callback) {
+ attributes = attributes || {};
+ var options = {jsonp: _.uniqueId('import_callback_')};
+ window[options.jsonp] = function () {
+ delete window[options.jsonp];
+ callback.apply(null, arguments);
+ };
+ if ('data' in attributes) {
+ _.extend(attributes.data, options);
+ } else {
+ _.extend(attributes, {data: options});
+ }
+ _.extend(attributes, {
+ dataType: 'script',
+ });
+ $(form).ajaxSubmit(attributes);
+ }
+
+ instance.web.DataImport = instance.web.Dialog.extend({
+ template: 'ImportView',
+ dialog_title: _lt("Import Data"),
+ events: {
+ 'change input.oe_import_file': 'file_update'
+ },
+ init: function (parent, dataset) {
+ var self = this;
+ this._super(parent, {
+ buttons: [
+ {text: _t("Import File"), click: function () {
+ self.do_import();
+ }, 'class': 'oe_import_dialog_button'}
+ ]
+ });
+ this.res_model = parent.model;
+ // import object id
+ this.id = null;
+ this.Import = new instance.web.Model('base_import.import');
+ },
+ start: function () {
+ var self = this;
+ return this.Import.call('create', [{
+ 'res_model': this.res_model
+ }]).then(function (id) {
+ self.id = id;
+ self.$('input[name=import_id]').val(id);
+ });
+ },
+
+ import_options: function () {
+ return {
+ // TODO: customizable gangnam style
+ quote: '"',
+ separator: ',',
+ headers: true,
+ };
+ },
+
+ //- File change section
+ file_update: function (e) {
+ if (!this.$('input.oe_import_file').val()) { return; }
+
+ this.$element.removeClass('oe_import_preview oe_import_error');
+ jsonp(this.$element, {
+ url: '/base_import/set_file'
+ }, this.proxy('file_updated'));
+ },
+ file_updated: function () {
+ // immediately trigger preview...
+ // TODO: test that write // succeeded?
+ this.Import.call(
+ 'parse_preview', [this.id, this.import_options()])
+ .then(this.proxy('preview'));
+ },
+ preview: function (result) {
+ if (result.error) {
+ this.$element.addClass('oe_import_error');
+ this.$('.oe_import_error_report').html(
+ QWeb.render('ImportView.preview.error', result));
+ } else {
+ this.$element.addClass('oe_import_preview');
+ this.$('table').html(
+ QWeb.render('ImportView.preview', result));
+ }
+ },
+
+ //- import itself
+ do_import: function () {
+ var fields = this.$('.oe_import_fields input').map(function (index, el) {
+ return el.value || false;
+ }).get();
+ this.Import.call(
+ 'do', [this.id, fields, this.import_options()], {
+ // maybe could do a dryrun after successful
+ // preview or something (note: don't go to
+ // this.result if dryrun=true)
+ dryrun: false
+ })
+ .then(this.proxy('result'));
+ },
+ result: function (errors) {
+ if (!errors.length) {
+ if (this.getParent().reload_content) {
+ this.getParent().reload_content();
+ }
+ this.close();
+ return;
+ }
+ // import failed (or maybe just warnings, if we ever get
+ // warnings?)
+ this.$element.addClass('oe_import_error');
+ this.$('.oe_import_error_report').html(
+ QWeb.render('ImportView.error', {errors: errors}));
+ },
+ });
+};
=== added directory 'base_import/static/src/xml'
=== added file 'base_import/static/src/xml/import.xml'
--- base_import/static/src/xml/import.xml 1970-01-01 00:00:00 +0000
+++ base_import/static/src/xml/import.xml 2012-08-14 14:54:31 +0000
@@ -0,0 +1,44 @@
+<templates>
+ <t t-name="ImportView">
+ <form action="" method="post" enctype="multipart/form-data" class="oe_import">
+ <input type="hidden" name="session_id"
+ t-att-value="widget.session.session_id"/>
+ <input type="hidden" name="import_id"/>
+ <label for="csvfile">CSV File:</label>
+ <input type="file" id="csvfile" name="file" class="oe_import_file"/>
+
+ <div class="oe_import_error_report"></div>
+ <table class="oe_import_grid" width="100%">
+ </table>
+ </form>
+ </t>
+ <!-- TODO: column matcher? -->
+ <t t-name="ImportView.preview">
+ <tr t-if="headers" class="oe_import_grid-header">
+ <td t-foreach="headers" t-as="header" class="oe_import_grid-cell"
+ ><t t-esc="header"/></td>
+ </tr>
+ <tr class="oe_import_fields">
+ <!-- Iterate on first row to ensure we have all columns -->
+ <td t-foreach="preview[0]" t-as="column">
+ <input placeholder="Don't Import"/>
+ </td>
+ </tr>
+ <tr t-foreach="preview" t-as="row" class="oe_import_grid-row">
+ <td t-foreach="row" t-as="cell" class="oe_import_grid-cell"
+ ><t t-esc="cell"/></td>
+ </tr>
+ </t>
+ <t t-name="ImportView.preview.error">
+ <p>Import preview failed due to: <t t-esc="error"/></p>
+ <p>Here is the start of the file we could not import:</p>
+ <pre><t t-esc="preview"/></pre>
+ </t>
+ <ul t-name="ImportView.error">
+ <li t-foreach="errors" t-as="error" t-attf-class="oe_import_report_#{error.type}">
+ <!-- can also have error.record, but may be *huge* if
+ e.g. has image fields -->
+ <t t-esc="error.message"/>
+ </li>
+ </ul>
+</templates>
=== added directory 'base_import/tests'
=== added file 'base_import/tests/__init__.py'
--- base_import/tests/__init__.py 1970-01-01 00:00:00 +0000
+++ base_import/tests/__init__.py 2012-08-14 14:54:31 +0000
@@ -0,0 +1,3 @@
+from . import test_cases
+
+checks = [test_cases]
=== added file 'base_import/tests/models.py'
--- base_import/tests/models.py 1970-01-01 00:00:00 +0000
+++ base_import/tests/models.py 2012-08-14 14:54:31 +0000
@@ -0,0 +1,101 @@
+from openerp.osv import orm, fields
+
+def name(n): return 'base_import.tests.models.%s' % n
+
+class char(orm.Model):
+ _name = name('char')
+
+ _columns = {
+ 'value': fields.char('unknown', size=None)
+ }
+
+class char_required(orm.Model):
+ _name = name('char.required')
+
+ _columns = {
+ 'value': fields.char('unknown', size=None, required=True)
+ }
+
+class char_readonly(orm.Model):
+ _name = name('char.readonly')
+
+ _columns = {
+ 'value': fields.char('unknown', size=None, readonly=True)
+ }
+
+class char_states(orm.Model):
+ _name = name('char.states')
+
+ _columns = {
+ 'value': fields.char('unknown', size=None, readonly=True, states={'draft': [('readonly', False)]})
+ }
+
+class char_noreadonly(orm.Model):
+ _name = name('char.noreadonly')
+
+ _columns = {
+ 'value': fields.char('unknown', size=None, readonly=True, states={'draft': [('invisible', True)]})
+ }
+
+class char_stillreadonly(orm.Model):
+ _name = name('char.stillreadonly')
+
+ _columns = {
+ 'value': fields.char('unknown', size=None, readonly=True, states={'draft': [('readonly', True)]})
+ }
+
+# TODO: complex field (m2m, o2m, m2o)
+class m2o(orm.Model):
+ _name = name('m2o')
+
+ _columns = {
+ 'value': fields.many2one(name('m2o.related'))
+ }
+class m2o_related(orm.Model):
+ _name = name('m2o.related')
+
+ _columns = {
+ 'value': fields.integer()
+ }
+ _defaults = {
+ 'value': 42
+ }
+
+class m2o_required(orm.Model):
+ _name = name('m2o.required')
+
+ _columns = {
+ 'value': fields.many2one(name('m2o.required.related'), required=True)
+ }
+class m2o_required_related(orm.Model):
+ _name = name('m2o.required.related')
+
+ _columns = {
+ 'value': fields.integer()
+ }
+ _defaults = {
+ 'value': 42
+ }
+
+class o2m(orm.Model):
+ _name = name('o2m')
+
+ _columns = {
+ 'value': fields.one2many(name('o2m.child'), 'parent_id')
+ }
+class o2m_child(orm.Model):
+ _name = name('o2m.child')
+
+ _columns = {
+ 'parent_id': fields.many2one(name('o2m')),
+ 'value': fields.integer()
+ }
+
+class preview_model(orm.Model):
+ _name = name('preview')
+
+ _columns = {
+ 'name': fields.char('Name', size=None),
+ 'somevalue': fields.integer('Some Value', required=True),
+ 'othervalue': fields.integer('Other Variable'),
+ }
=== added file 'base_import/tests/test_cases.py'
--- base_import/tests/test_cases.py 1970-01-01 00:00:00 +0000
+++ base_import/tests/test_cases.py 2012-08-14 14:54:31 +0000
@@ -0,0 +1,345 @@
+# -*- encoding: utf-8 -*-
+import unittest2
+from openerp.tests.common import TransactionCase
+
+from .. import models
+
+ID_FIELD = {'id': 'id', 'name': 'id', 'string': "External ID", 'required': False, 'fields': []}
+def make_field(name='value', string='unknown', required=False, fields=[]):
+ return [
+ ID_FIELD,
+ {'id': name, 'name': name, 'string': string, 'required': required, 'fields': fields},
+ ]
+
+class test_basic_fields(TransactionCase):
+ def get_fields(self, field):
+ return self.registry('base_import.import')\
+ .get_fields(self.cr, self.uid, 'base_import.tests.models.' + field)
+
+ def test_base(self):
+ """ A basic field is not required """
+ self.assertEqual(self.get_fields('char'), make_field())
+
+ def test_required(self):
+ """ Required fields should be flagged (so they can be fill-required) """
+ self.assertEqual(self.get_fields('char.required'), make_field(required=True))
+
+ def test_readonly(self):
+ """ Readonly fields should be filtered out"""
+ self.assertEqual(self.get_fields('char.readonly'), [ID_FIELD])
+
+ def test_readonly_states(self):
+ """ Readonly fields with states should not be filtered out"""
+ self.assertEqual(self.get_fields('char.states'), make_field())
+
+ def test_readonly_states_noreadonly(self):
+ """ Readonly fields with states having nothing to do with
+ readonly should still be filtered out"""
+ self.assertEqual(self.get_fields('char.noreadonly'), [ID_FIELD])
+
+ def test_readonly_states_stillreadonly(self):
+ """ Readonly fields with readonly states leaving them readonly
+ always... filtered out"""
+ self.assertEqual(self.get_fields('char.stillreadonly'), [ID_FIELD])
+
+ def test_m2o(self):
+ """ M2O fields should allow import of themselves (name_get),
+ their id and their xid"""
+ self.assertEqual(self.get_fields('m2o'), make_field(fields=[
+ {'id': 'value', 'name': 'id', 'string': 'External ID', 'required': False, 'fields': []},
+ {'id': 'value', 'name': '.id', 'string': 'Database ID', 'required': False, 'fields': []},
+ ]))
+
+ def test_m2o_required(self):
+ """ If an m2o field is required, its three sub-fields are
+ required as well (the client has to handle that: requiredness
+ is id-based)
+ """
+ self.assertEqual(self.get_fields('m2o.required'), make_field(required=True, fields=[
+ {'id': 'value', 'name': 'id', 'string': 'External ID', 'required': True, 'fields': []},
+ {'id': 'value', 'name': '.id', 'string': 'Database ID', 'required': True, 'fields': []},
+ ]))
+
+class test_o2m(TransactionCase):
+ def get_fields(self, field):
+ return self.registry('base_import.import')\
+ .get_fields(self.cr, self.uid, 'base_import.tests.models.' + field)
+
+ def test_shallow(self):
+ self.assertEqual(self.get_fields('o2m'), make_field(fields=[
+ {'id': 'id', 'name': 'id', 'string': 'External ID', 'required': False, 'fields': []},
+ # FIXME: should reverse field be ignored?
+ {'id': 'parent_id', 'name': 'parent_id', 'string': 'unknown', 'required': False, 'fields': [
+ {'id': 'parent_id', 'name': 'id', 'string': 'External ID', 'required': False, 'fields': []},
+ {'id': 'parent_id', 'name': '.id', 'string': 'Database ID', 'required': False, 'fields': []},
+ ]},
+ {'id': 'value', 'name': 'value', 'string': 'unknown', 'required': False, 'fields': []},
+ ]))
+
+class test_match_headers_single(TransactionCase):
+ def test_match_by_name(self):
+ match = self.registry('base_import.import')._match_header(
+ 'f0', [{'name': 'f0'}], {})
+
+ self.assertEqual(match, [{'name': 'f0'}])
+
+ def test_match_by_string(self):
+ match = self.registry('base_import.import')._match_header(
+ 'some field', [{'name': 'bob', 'string': "Some Field"}], {})
+
+ self.assertEqual(match, [{'name': 'bob', 'string': "Some Field"}])
+
+ def test_nomatch(self):
+ match = self.registry('base_import.import')._match_header(
+ 'should not be', [{'name': 'bob', 'string': "wheee"}], {})
+
+ self.assertEqual(match, [])
+
+ def test_recursive_match(self):
+ f = {
+ 'name': 'f0',
+ 'string': "My Field",
+ 'fields': [
+ {'name': 'f0', 'string': "Sub field 0", 'fields': []},
+ {'name': 'f1', 'string': "Sub field 2", 'fields': []},
+ ]
+ }
+ match = self.registry('base_import.import')._match_header(
+ 'f0/f1', [f], {})
+
+ self.assertEqual(match, [f, f['fields'][1]])
+
+ def test_recursive_nomatch(self):
+ """ Match first level, fail to match second level
+ """
+ f = {
+ 'name': 'f0',
+ 'string': "My Field",
+ 'fields': [
+ {'name': 'f0', 'string': "Sub field 0", 'fields': []},
+ {'name': 'f1', 'string': "Sub field 2", 'fields': []},
+ ]
+ }
+ match = self.registry('base_import.import')._match_header(
+ 'f0/f2', [f], {})
+
+ self.assertEqual(match, [])
+
+class test_match_headers_multiple(TransactionCase):
+ def test_noheaders(self):
+ self.assertEqual(
+ self.registry('base_import.import')._match_headers(
+ [], [], {}),
+ (None, None)
+ )
+ def test_nomatch(self):
+ self.assertEqual(
+ self.registry('base_import.import')._match_headers(
+ iter([
+ ['foo', 'bar', 'baz', 'qux'],
+ ['v1', 'v2', 'v3', 'v4'],
+ ]),
+ [],
+ {'headers': True}),
+ (
+ ['foo', 'bar', 'baz', 'qux'],
+ dict.fromkeys(range(4))
+ )
+ )
+
+ def test_mixed(self):
+ self.assertEqual(
+ self.registry('base_import.import')._match_headers(
+ iter(['foo bar baz qux/corge'.split()]),
+ [
+ {'name': 'bar', 'string': 'Bar'},
+ {'name': 'bob', 'string': 'Baz'},
+ {'name': 'qux', 'string': 'Qux', 'fields': [
+ {'name': 'corge', 'fields': []},
+ ]}
+ ],
+ {'headers': True}),
+ (['foo', 'bar', 'baz', 'qux/corge'], {
+ 0: None,
+ 1: ['bar'],
+ 2: ['bob'],
+ 3: ['qux', 'corge'],
+ })
+ )
+
+import base64, csv
+class test_preview(TransactionCase):
+ def make_import(self):
+ Import = self.registry('base_import.import')
+ id = Import.create(self.cr, self.uid, {
+ 'res_model': 'res.users',
+ 'file': base64.b64encode(
+ u"로그인,언어\n".encode('euc_kr'),
+ "bob,1\n"),
+ })
+ return Import, id
+
+ def test_encoding(self):
+ Import, id = self.make_import()
+ result = Import.parse_preview(self.cr, self.uid, id, {
+ 'quote': '"',
+ 'separator': ',',
+ })
+ self.assertTrue('error' in result)
+
+ def test_csv_errors(self):
+ Import, id = self.make_import()
+
+ result = Import.parse_preview(self.cr, self.uid, id, {
+ 'quote': 'foo',
+ 'separator': ',',
+ 'encoding': 'euc_kr',
+ })
+ self.assertTrue('error' in result)
+
+ def test_csv_errors(self):
+ Import, id = self.make_import()
+
+ result = Import.parse_preview(self.cr, self.uid, id, {
+ 'quote': '"',
+ 'separator': 'bob',
+ 'encoding': 'euc_kr',
+ })
+ self.assertTrue('error' in result)
+
+ def test_success(self):
+ Import = self.registry('base_import.import')
+ id = Import.create(self.cr, self.uid, {
+ 'res_model': 'base_import.tests.models.preview',
+ 'file': base64.b64encode('name,Some Value,Counter\n'
+ 'foo,1,2\n'
+ 'bar,3,4\n'
+ 'qux,5,6\n')
+ })
+
+ result = Import.parse_preview(self.cr, self.uid, id, {
+ 'quote': '"',
+ 'separator': ',',
+ 'headers': True,
+ })
+
+ self.assertEqual(result['matches'], {0: ['name'], 1: ['somevalue'], 2: None})
+ self.assertEqual(result['headers'], ['name', 'Some Value', 'Counter'])
+ # Order depends on iteration order of fields_get
+ self.assertItemsEqual(result['fields'], [
+ {'id': 'id', 'name': 'id', 'string': 'External ID', 'required':False, 'fields': []},
+ {'id': 'name', 'name': 'name', 'string': 'Name', 'required':False, 'fields': []},
+ {'id': 'somevalue', 'name': 'somevalue', 'string': 'Some Value', 'required':True, 'fields': []},
+ {'id': 'othervalue', 'name': 'othervalue', 'string': 'Other Variable', 'required':False, 'fields': []},
+ ])
+ self.assertEqual(result['preview'], [
+ ['foo', '1', '2'],
+ ['bar', '3', '4'],
+ ['qux', '5', '6'],
+ ])
+ # Ensure we only have the response fields we expect
+ self.assertItemsEqual(result.keys(), ['matches', 'headers', 'fields', 'preview'])
+
+class test_convert_import_data(TransactionCase):
+ """ Tests conversion of base_import.import input into data which
+ can be fed to Model.import_data
+ """
+ def test_all(self):
+ Import = self.registry('base_import.import')
+ id = Import.create(self.cr, self.uid, {
+ 'res_model': 'base_import.tests.models.preview',
+ 'file': base64.b64encode('name,Some Value,Counter\n'
+ 'foo,1,2\n'
+ 'bar,3,4\n'
+ 'qux,5,6\n')
+ })
+ record = Import.browse(self.cr, self.uid, id)
+ data, fields = Import._convert_import_data(
+ record, ['name', 'somevalue', 'othervalue'],
+ {'quote': '"', 'separator': ',', 'headers': True,})
+
+ self.assertItemsEqual(fields, ['name', 'somevalue', 'othervalue'])
+ self.assertItemsEqual(data, [
+ ('foo', '1', '2'),
+ ('bar', '3', '4'),
+ ('qux', '5', '6'),
+ ])
+
+ def test_filtered(self):
+ """ If ``False`` is provided as field mapping for a column,
+ that column should be removed from importable data
+ """
+ Import = self.registry('base_import.import')
+ id = Import.create(self.cr, self.uid, {
+ 'res_model': 'base_import.tests.models.preview',
+ 'file': base64.b64encode('name,Some Value,Counter\n'
+ 'foo,1,2\n'
+ 'bar,3,4\n'
+ 'qux,5,6\n')
+ })
+ record = Import.browse(self.cr, self.uid, id)
+ data, fields = Import._convert_import_data(
+ record, ['name', False, 'othervalue'],
+ {'quote': '"', 'separator': ',', 'headers': True,})
+
+ self.assertItemsEqual(fields, ['name', 'othervalue'])
+ self.assertItemsEqual(data, [
+ ('foo', '2'),
+ ('bar', '4'),
+ ('qux', '6'),
+ ])
+
+ def test_norow(self):
+ """ If a row is composed only of empty values (due to having
+ filtered out non-empty values from it), it should be removed
+ """
+ Import = self.registry('base_import.import')
+ id = Import.create(self.cr, self.uid, {
+ 'res_model': 'base_import.tests.models.preview',
+ 'file': base64.b64encode('name,Some Value,Counter\n'
+ 'foo,1,2\n'
+ ',3,\n'
+ ',5,6\n')
+ })
+ record = Import.browse(self.cr, self.uid, id)
+ data, fields = Import._convert_import_data(
+ record, ['name', False, 'othervalue'],
+ {'quote': '"', 'separator': ',', 'headers': True,})
+
+ self.assertItemsEqual(fields, ['name', 'othervalue'])
+ self.assertItemsEqual(data, [
+ ('foo', '2'),
+ ('', '6'),
+ ])
+
+ def test_nofield(self):
+ Import = self.registry('base_import.import')
+
+ id = Import.create(self.cr, self.uid, {
+ 'res_model': 'base_import.tests.models.preview',
+ 'file': base64.b64encode('name,Some Value,Counter\n'
+ 'foo,1,2\n')
+ })
+
+ record = Import.browse(self.cr, self.uid, id)
+ self.assertRaises(
+ ValueError,
+ Import._convert_import_data,
+ record, [],
+ {'quote': '"', 'separator': ',', 'headers': True,})
+
+ def test_falsefields(self):
+ Import = self.registry('base_import.import')
+
+ id = Import.create(self.cr, self.uid, {
+ 'res_model': 'base_import.tests.models.preview',
+ 'file': base64.b64encode('name,Some Value,Counter\n'
+ 'foo,1,2\n')
+ })
+
+ record = Import.browse(self.cr, self.uid, id)
+ self.assertRaises(
+ ValueError,
+ Import._convert_import_data,
+ record, [False, False, False],
+ {'quote': '"', 'separator': ',', 'headers': True,})
_______________________________________________
Mailing list: https://launchpad.net/~openerp-dev-gtk
Post to : [email protected]
Unsubscribe : https://launchpad.net/~openerp-dev-gtk
More help : https://help.launchpad.net/ListHelp