Hello community,

here is the log from the commit of package obs-service-tar_scm for 
openSUSE:Factory checked in at 2012-02-17 12:18:38
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/obs-service-tar_scm (Old)
 and      /work/SRC/openSUSE:Factory/.obs-service-tar_scm.new (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "obs-service-tar_scm", Maintainer is ""

Changes:
--------
--- /work/SRC/openSUSE:Factory/obs-service-tar_scm/obs-service-tar_scm.changes  
2012-01-04 07:25:39.000000000 +0100
+++ 
/work/SRC/openSUSE:Factory/.obs-service-tar_scm.new/obs-service-tar_scm.changes 
    2012-02-17 12:18:39.000000000 +0100
@@ -1,0 +2,37 @@
+Thu Feb 16 15:23:35 GMT 2012 - aspi...@suse.com
+
+- When the cache is used, output location of repo in the cache
+
+-------------------------------------------------------------------
+Tue Feb 14 16:52:19 GMT 2012 - aspi...@suse.com
+
+- add new 'versionformat' option to determine how version is
+  extracted via git show --pretty=...
+- support caching of cloned repositories to speed up fetch
+  from upstream
+
+-------------------------------------------------------------------
+Mon Feb  13 15:52:19 GMT 2012 - aspi...@suse.com
+
+- Add test suite
+- Fix --subdir with --scm svn
+- Fix --scm bzr
+
+-------------------------------------------------------------------
+Mon Feb 13 10:51:19 UTC 2012 - co...@suse.com
+
+- patch license to follow spdx.org standard
+
+-------------------------------------------------------------------
+Tue Jan 24 15:46:17 UTC 2012 - rschi...@gmail.com
+
+- add new option to specify a subset of files/subdirectories to
+  pack in the tar ball
+
+-------------------------------------------------------------------
+Tue Jan 24 13:26:19 UTC 2012 - rschi...@gmail.com
+
+- Checking out a specific revision cannot work when only the latest
+  version is cloned.
+
+-------------------------------------------------------------------

New:
----
  bzrfixtures.py
  bzrtests.py
  commontests.py
  fixtures.py
  gitfixtures.py
  githgtests.py
  gittests.py
  hgfixtures.py
  hgtests.py
  scm-wrapper
  scmlogs.py
  svnfixtures.py
  svntests.py
  tar_scm.rc
  test.py
  testassertions.py
  testenv.py
  utils.py

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ obs-service-tar_scm.spec ++++++
--- /var/tmp/diff_new_pack.G30PbZ/_old  2012-02-17 12:18:42.000000000 +0100
+++ /var/tmp/diff_new_pack.G30PbZ/_new  2012-02-17 12:18:42.000000000 +0100
@@ -1,7 +1,7 @@
 #
 # spec file for package obs-service-tar_scm
 #
-# Copyright (c) 2011 SUSE LINUX Products GmbH, Nuernberg, Germany.
+# Copyright (c) 2012 SUSE LINUX Products GmbH, Nuernberg, Germany.
 #
 # All modifications and additions to the file contributed by third parties
 # remain the property of their copyright owners, unless otherwise agreed
@@ -16,17 +16,44 @@
 #
 
 
+%define service tar_scm
 
-Name:           obs-service-tar_scm
-License:        GPL v2 or later
-Group:          Development/Tools/Building
+Name:           obs-service-%{service}
 Summary:        An OBS source service: checkout or update a tar ball from 
svn/git/hg
-Url:            
https://build.opensuse.org/package/show?package=obs-service-tar_scm&project=openSUSE%3ATools
-Version:        0.2.1
-Release:        1
-Source:         tar_scm
-Source1:        tar_scm.service
-Requires:       subversion git mercurial bzr
+License:        GPL-2.0+
+Group:          Development/Tools/Building
+Url:            
https://build.opensuse.org/package/show?package=obs-service-%{service}&project=openSUSE%3ATools
+Version:        0.2.3
+Release:        0
+Source:         %{service}
+Source1:        %{service}.service
+Source2:        %{service}.rc
+
+# test suite files
+Source100:      bzrfixtures.py
+Source101:      bzrtests.py
+Source102:      commontests.py
+Source103:      fixtures.py
+Source104:      gitfixtures.py
+Source105:      githgtests.py
+Source106:      gittests.py
+Source107:      hgfixtures.py
+Source108:      hgtests.py
+Source109:      scmlogs.py
+Source110:      svnfixtures.py
+Source111:      svntests.py
+Source112:      testassertions.py
+Source113:      testenv.py
+Source114:      test.py
+Source115:      utils.py
+Source116:      scm-wrapper
+
+Requires:       bzr git mercurial subversion
+BuildRequires:  bzr
+BuildRequires:  git
+BuildRequires:  mercurial
+BuildRequires:  python >= 2.6
+BuildRequires:  subversion
 BuildRoot:      %{_tmppath}/%{name}-%{version}-build
 BuildArch:      noarch
 
@@ -46,9 +73,20 @@
 install -m 0755 %{SOURCE0} $RPM_BUILD_ROOT/usr/lib/obs/service
 install -m 0644 %{SOURCE1} $RPM_BUILD_ROOT/usr/lib/obs/service
 
+mkdir -p $RPM_BUILD_ROOT/etc/obs/services
+install -m 0644 %{SOURCE2} $RPM_BUILD_ROOT/etc/obs/services/%{service}
+
+%check
+chmod +x $RPM_SOURCE_DIR/scm-wrapper
+: Running the test suite.  Please be patient - this takes a few minutes ...
+python $RPM_SOURCE_DIR/test.py
+
 %files
 %defattr(-,root,root)
 %dir /usr/lib/obs
 /usr/lib/obs/service
+%dir /etc/obs
+%dir /etc/obs/services
+%config(noreplace) /etc/obs/services/*
 
 %changelog

++++++ bzrfixtures.py ++++++
#!/usr/bin/python

import os

from   fixtures  import Fixtures
from   utils     import mkfreshdir, run_bzr

class BzrFixtures(Fixtures):
    def init(self):
        self.create_repo()
        self.create_commits(2)

    def run(self, cmd):
        return run_bzr(self.repo_path, cmd)

    def create_repo(self):
        os.makedirs(self.repo_path)
        os.chdir(self.repo_path)
        self.run('init')
        self.run('whoami "%s"' % self.name_and_email)
        self.wd = self.repo_path
        print "created repo", self.repo_path

    def do_commit(self, newly_created):
        self.run('add .')
        self.run('commit -m%d' % self.next_commit_rev)

    def record_rev(self, rev_num):
        self.revs[rev_num] = str(rev_num)
        self.scmlogs.annotate("Recorded rev %d" % rev_num)
++++++ bzrtests.py ++++++
#!/usr/bin/python

from   commontests import CommonTests
from   bzrfixtures import BzrFixtures
from   utils       import run_bzr

class BzrTests(CommonTests):
    scm = 'bzr'
    initial_clone_command = 'bzr checkout'
    update_cache_command  = 'bzr update'
    fixtures_class = BzrFixtures

    def default_version(self):
        return self.rev(2)

    def test_versionformat_rev(self):
        self.tar_scm_std('--versionformat', 'myrev%r.svn')
        self.assertTarOnly(self.basename(version = 'myrev2.svn'))

    def test_version_versionformat(self):
        self.tar_scm_std('--version', '3.0', '--versionformat', 'myrev%r.svn')
        self.assertTarOnly(self.basename(version = 'myrev2.svn'))

    def test_versionformat_revision(self):
        self.fixtures.create_commits(4)
        self.tar_scm_std('--versionformat', 'foo%r', '--revision', self.rev(2))
        basename = self.basename(version = 'foo2')
        th = self.assertTarOnly(basename)
        self.assertTarMemberContains(th, basename + '/a', '2')
++++++ commontests.py ++++++
#!/usr/bin/python

from pprint         import pprint, pformat

from testassertions import TestAssertions
from testenv        import TestEnvironment
from utils          import mkfreshdir

class CommonTests(TestEnvironment, TestAssertions):
    def basename(self, name='repo', version=None):
        if version is None:
            version = self.default_version()
        return '%s-%s' % (name, version)

    def test_plain(self):
        self.tar_scm_std()
        self.assertTarOnly(self.basename())

    def test_exclude(self):
        self.tar_scm_std('--exclude', '.' + self.scm)
        self.assertTarOnly(self.basename())

    def test_subdir(self):
        self.tar_scm_std('--subdir', self.fixtures.subdir)
        self.assertTarOnly(self.basename(), tarchecker=self.assertSubdirTar)

    def test_history_depth_obsolete(self):
        (stdout, stderr, ret) = self.tar_scm_std('--history-depth', '1')
        self.assertRegexpMatches(stdout, 'obsolete')
        # self.assertTarOnly(self.basename())
        # self.assertRegexpMatches(self.scmlogs.read()[0], '^%s clone 
--depth=1')

    # def test_history_depth_full(self):
    #     self.tar_scm_std('--history-depth', 'full')
    #     self.assertTarOnly(self.basename())
    #     self.assertRegexpMatches(self.scmlogs.read()[0], '^git clone 
--depth=999999+')

    def test_filename(self):
        name = 'myfilename'
        self.tar_scm_std('--filename', name)
        self.assertTarOnly(self.basename(name=name))

    def test_version(self):
        version = '0.5'
        self.tar_scm_std('--version', version)
        self.assertTarOnly(self.basename(version=version))

    def test_filename_version(self):
        filename = 'myfilename'
        version = '0.6'
        self.tar_scm_std('--filename', filename, '--version', version)
        self.assertTarOnly(self.basename(filename, version))

    def test_revision_nop(self):
        self.tar_scm_std('--revision', self.rev(2))
        th = self.assertTarOnly(self.basename())
        self.assertTarMemberContains(th, self.basename() + '/a', '2')

    def test_revision(self):
        self._revision()

    def test_revision_no_cache(self):
        self._revision(use_cache=False)

    def test_revision_subdir(self):
        self._revision(use_subdir=True)

    def test_revision_subdir_no_cache(self):
        self._revision(use_cache=False, use_subdir=True)

    def _revision(self, use_cache=True, use_subdir=False):
        version = '3.0'
        args_tag2 = [
            '--version', version,
            '--revision', self.rev(2),
        ]
        if use_subdir:
            args_tag2 += [ '--subdir', self.fixtures.subdir ]
        self._sequential_calls_with_revision(
            version,
            [
                (0, args_tag2, '2', False),
                (0, args_tag2, '2', use_cache),
                (2, args_tag2, '2', use_cache),
                (0, args_tag2, '2', use_cache),
                (2, args_tag2, '2', use_cache),
                (0, args_tag2, '2', use_cache),
            ],
            use_cache
        )

    def test_revision_master_alternating(self):
        self._revision_master_alternating()

    def test_revision_master_alternating_no_cache(self):
        self._revision_master_alternating(use_cache=False)

    def test_revision_master_alternating_subdir(self):
        self._revision_master_alternating(use_subdir=True)

    def test_revision_master_alternating_subdir_no_cache(self):
        self._revision_master_alternating(use_cache=False, use_subdir=True)

    def _revision_master_alternating(self, use_cache=True, use_subdir=False):
        version = '4.0'
        args_head = [
            '--version', version,
        ]
        if use_subdir:
            args_head += [ '--subdir', self.fixtures.subdir ]
        
        args_tag2 = args_head + [ '--revision', self.rev(2) ]
        self._sequential_calls_with_revision(
            version,
            [
                (0, args_tag2, '2', False),
                (0, args_head, '2', use_cache),
                (2, args_tag2, '2', use_cache),
                (0, args_head, '4', use_cache),
                (2, args_tag2, '2', use_cache),
                (0, args_head, '6', use_cache),
                (0, args_tag2, '2', use_cache),
            ],
            use_cache
        )

    def _sequential_calls_with_revision(self, version, calls, use_cache=True):
        mkfreshdir(self.pkgdir)
        basename = self.basename(version = version)

        if not use_cache:
            self.disableCache()

        while calls:
            new_commits, args, expected, expect_cache_hit = calls.pop(0)
            if new_commits > 0:
                self.fixtures.create_commits(new_commits)
            self.scmlogs.annotate("about to run: " + pformat(args))
            self.scmlogs.annotate("expecting tar to contain: " + expected)
            self.tar_scm_std(*args)
            logpath  = self.scmlogs.current_log_path
            loglines = self.scmlogs.read()
            if expect_cache_hit:
                self.assertRanUpdate(logpath, loglines)
            else:
                self.assertRanInitialClone(logpath, loglines)

            if self.fixtures.subdir in args:
                th = self.assertTarOnly(basename, 
tarchecker=self.assertSubdirTar)
                tarent = 'b'
            else:
                th = self.assertTarOnly(basename)
                tarent = 'a'
            self.assertTarMemberContains(th, basename + '/' + tarent, expected)

            self.scmlogs.next()
            self.postRun()

    def test_switch_revision_and_subdir(self):
        self._switch_revision_and_subdir()

    def test_switch_revision_and_subdir_no_cache(self):
        self._switch_revision_and_subdir(use_cache=False)

    def _switch_revision_and_subdir(self, use_cache=True):
        version = '5.0'
        args = [
            '--version', version,
        ]
        args_subdir = args+ [ '--subdir', self.fixtures.subdir ]
        
        args_tag2 = args + [ '--revision', self.rev(2) ]
        self._sequential_calls_with_revision(
            version,
            [
                (0, args_tag2,   '2', False),
                (0, args_subdir, '2', use_cache and self.scm != 'svn'),
                (2, args_tag2,   '2', use_cache),
                (0, args_subdir, '4', use_cache),
                (2, args_tag2,   '2', use_cache),
                (0, args_subdir, '6', use_cache),
                (0, args_tag2,   '2', use_cache),
            ],
            use_cache
        )
++++++ fixtures.py ++++++
#!/usr/bin/python

import os
import shutil

class Fixtures:
    name  = 'tar_scm test suite'
    email = 'root@localhost'
    name_and_email = '%s <%s>' % (name, email)

    subdir = 'subdir'
    subdir1 = 'subdir1'
    subdir2 = 'subdir2'
    next_commit_rev = 1

    def __init__(self, container_dir, scmlogs):
        self.container_dir = container_dir
        self.scmlogs       = scmlogs
        self.repo_path     = self.container_dir + '/repo'
        self.repo_url      = 'file://' + self.repo_path

        # Keys are stringified integers representing commit sequence numbers;
        # values can be passed to --revision
        self.revs = { }

    def setup(self):
        print self.__class__.__name__ + ": setting up fixtures"
        self.init_fixtures_dir()
        self.init()

    def init_fixtures_dir(self):
        if os.path.exists(self.repo_path):
            shutil.rmtree(self.repo_path)

    def init(self):
        raise NotImplementedError, \
            self.__class__.__name__ + " didn't implement init()"

    def create_commits(self, num_commits):
        self.scmlogs.annotate("Creating %d commits ..." % num_commits)
        if num_commits == 0:
            return

        for i in xrange(0, num_commits):
            new_rev = self.create_commit()
        self.record_rev(new_rev)

        self.scmlogs.annotate("Created %d commits; now at %s" % (num_commits, 
new_rev))

    def create_commit(self):
        os.chdir(self.wd)
        newly_created = self.prep_commit()
        self.do_commit(newly_created)
        new_rev = self.next_commit_rev
        self.next_commit_rev += 1
        return new_rev

    def prep_commit(self):
        """
        Caller should ensure correct cwd.
        Returns list of newly created files.
        """
        newly_created = [ ]

        if not os.path.exists('a'):
            newly_created.append('a')

        if not os.path.exists(self.subdir):
            os.mkdir(self.subdir)
            # This will take care of adding subdir/b too
            newly_created.append(self.subdir)

        for fn in ('a', self.subdir + '/b'):
            f = open(fn, 'w')
            f.write(str(self.next_commit_rev))
            f.close()

        return newly_created
++++++ gitfixtures.py ++++++
#!/usr/bin/python

import os

from   fixtures  import Fixtures
from   utils     import mkfreshdir, run_git

class GitFixtures(Fixtures):
    def init(self):
        self.create_repo()

        self.timestamps   = { }
        self.sha1s        = { }

        self.create_commits(2)

    def run(self, cmd):
        return run_git(self.repo_path, cmd)

    def create_repo(self):
        os.makedirs(self.repo_path)
        os.chdir(self.repo_path)
        self.run('init')
        self.run('config user.name test')
        self.run('config user.email t...@test.com')
        self.wd = self.repo_path
        print "created repo", self.repo_path

    def do_commit(self, newly_created):
        self.run('add .')
        self.run('commit -m%d' % self.next_commit_rev)

    def get_metadata(self, formatstr):
        return self.run('log -n1 --pretty=format:"%s"' % formatstr)[0]

    def record_rev(self, rev_num):
        tag = 'tag' + str(rev_num)
        self.run('tag ' + tag)
        self.revs[rev_num]   = tag
        self.timestamps[tag] = self.get_metadata('%at')
        self.sha1s[tag]      = self.get_metadata('%h')
        self.scmlogs.annotate(
            "Recorded rev %d: id %s, timestamp %s, SHA1 %s" % \
                (rev_num,
                 tag,
                 self.timestamps[tag],
                 self.sha1s[tag])
        )
++++++ githgtests.py ++++++
#!/usr/bin/python

import os

from   commontests import CommonTests
from   utils       import run_hg

class GitHgTests(CommonTests):
    mixed_version_template = '%s.master.%s'

    def test_versionformat_abbrevhash(self):
        self.tar_scm_std('--versionformat', self.abbrev_hash_format)
        self.assertTarOnly(self.basename(version = self.sha1s(self.rev(2))))

    def test_versionformat_timestamp(self):
        self.tar_scm_std('--versionformat', self.timestamp_format)
        self.assertTarOnly(self.basename(version = 
self.timestamps(self.rev(2))))

    def _mixed_version_format(self):
        return self.mixed_version_template % (self.timestamp_format, 
self.abbrev_hash_format)

    def _mixed_version(self):
        return self.mixed_version_template % (self.timestamps(self.rev(2)), 
self.sha1s(self.rev(2)))

    def test_versionformat_mixed(self):
        self.tar_scm_std('--versionformat', self._mixed_version_format())
        self.assertTarOnly(self.basename(version = self._mixed_version()))

    def test_version_versionformat(self):
        self.tar_scm_std('--version', '3.0', '--versionformat', 
self._mixed_version_format())
        self.assertTarOnly(self.basename(version = self._mixed_version()))

    def test_versionformat_revision(self):
        self.fixtures.create_commits(4)
        self.tar_scm_std('--versionformat', self.abbrev_hash_format, 
'--revision', self.rev(2))
        basename = self.basename(version = self.sha1s(self.rev(2)))
        th = self.assertTarOnly(basename)
        self.assertTarMemberContains(th, basename + '/a', '2')
++++++ gittests.py ++++++
#!/usr/bin/python

from   githgtests  import GitHgTests
from   gitfixtures import GitFixtures
from   utils       import run_git

class GitTests(GitHgTests):
    scm = 'git'
    initial_clone_command = 'git clone'
    update_cache_command  = 'git fetch'
    fixtures_class = GitFixtures

    abbrev_hash_format = '%h'
    timestamp_format   = '%at'

    def default_version(self):
        return self.timestamps(self.rev(2))
++++++ hgfixtures.py ++++++
#!/usr/bin/python

import os

from   fixtures  import Fixtures
from   utils     import mkfreshdir, run_hg

class HgFixtures(Fixtures):
    def init(self):
        self.create_repo()

        self.timestamps   = { }
        self.sha1s        = { }

        self.create_commits(2)

    def run(self, cmd):
        return run_hg(self.repo_path, cmd)

    def create_repo(self):
        os.makedirs(self.repo_path)
        os.chdir(self.repo_path)
        self.run('init')
        c = open('.hg/hgrc', 'w')
        c.write("[ui]\nusername = %s\n" % self.name_and_email)
        c.close()
        self.wd = self.repo_path
        print "created repo", self.repo_path

    def do_commit(self, newly_created):
        self.run('add .')
        self.run('commit -m%d' % self.next_commit_rev)

    def get_metadata(self, formatstr):
        return self.run('log -l1 --template "%s"' % formatstr)[0]

    def record_rev(self, rev_num):
        tag = str(rev_num - 1) # hg starts counting changesets at 0
        self.revs[rev_num]   = tag
        self.timestamps[tag] = self.get_metadata('{date}')
        self.sha1s[tag]      = self.get_metadata('{node|short}')
        self.scmlogs.annotate(
            "Recorded rev %d: id %s, timestamp %s, SHA1 %s" % \
                (rev_num,
                 tag,
                 self.timestamps[tag],
                 self.sha1s[tag])
        )
++++++ hgtests.py ++++++
#!/usr/bin/python

from   githgtests  import GitHgTests
from   hgfixtures  import HgFixtures
from   utils       import run_hg

class HgTests(GitHgTests):
    scm = 'hg'
    initial_clone_command = 'hg clone'
    update_cache_command  = 'hg pull'
    fixtures_class = HgFixtures

    abbrev_hash_format = '{node|short}'
    timestamp_format   = '{date}'

    def default_version(self):
        return self.rev(2)
++++++ scm-wrapper ++++++
#!/bin/bash

# Wrapper around SCM to enable behaviour verification testing
# on tar_scm's repository caching code.  This is cleaner than
# writing tests which look inside the cache, because then they
# become coupled to the cache's implementation, and require
# knowledge of where the cache lives etc.

me=`basename $0`

if [ -z "$SCM_INVOCATION_LOG" ]; then
    cat <<EOF >&2
\$SCM_INVOCATION_LOG must be set before calling $0.
It should be invoked from the test suite, not directly.
EOF
    exit 1
fi

if [ "$me" = 'scm-wrapper' ]; then
    echo "$me should not be invoked directly, only via symlink" >&2
    exit 1
fi

echo "$me $*" >> "$SCM_INVOCATION_LOG"

/usr/bin/$me "$@"
++++++ scmlogs.py ++++++
#!/usr/bin/python

import glob
import os

class ScmInvocationLogs:
    """
    Provides log files which tracks invocations of SCM binaries.  The
    tracking is done via a wrapper around SCM to enable behaviour
    verification testing on tar_scm's repository caching code.  This
    is cleaner than writing tests which look inside the cache, because
    then they become coupled to the cache's implementation, and
    require knowledge of where the cache lives etc.

    One instance should be constructed per unit test.  If the test
    invokes the SCM binary multiple times, invoke next() in between
    each, so that a separate log file is used for each invocation -
    this allows more accurate fine-grained assertions on the
    invocation log.
    """

    @classmethod
    def setup_bin_wrapper(cls, scm, tmp_dir):
        cls.wrapper_dir = tmp_dir + '/wrappers'

        if not os.path.exists(cls.wrapper_dir):
            os.makedirs(cls.wrapper_dir)

        wrapper = cls.wrapper_dir + '/' + scm
        if not os.path.exists(wrapper):
            os.symlink('../../scm-wrapper', wrapper)

        path = os.getenv('PATH')
        prepend = cls.wrapper_dir + ':'

        if not path.startswith(prepend):
            new_path = prepend + path
            os.environ['PATH'] = new_path

    def __init__(self, scm, test_dir):
        self.scm = scm
        self.test_dir = test_dir
        self.counter = 0
        self.unlink_existing_logs()

    def get_log_file_template(self):
        return '%s-invocation-%%s.log' % self.scm

    def get_log_path_template(self):
        return os.path.join(self.test_dir, self.get_log_file_template())

    def unlink_existing_logs(self):
        pat = self.get_log_path_template() % '*'
        for log in glob.glob(pat):
            os.unlink(log)

    def get_log_file(self, identifier):
        if identifier:
            identifier = '-' + identifier
        return self.get_log_file_template() % ('%02d%s' % (self.counter, 
identifier))

    def get_log_path(self, identifier):
        return os.path.join(self.test_dir, self.get_log_file(identifier))

    def next(self, identifier=''):
        self.counter += 1
        self.current_log_path = self.get_log_path(identifier)
        if os.path.exists(self.current_log_path):
            raise RuntimeError, "%s already existed?!" % self.current_log_path
        os.putenv('SCM_INVOCATION_LOG', self.current_log_path)

    def annotate(self, msg):
        log = open(self.current_log_path, 'a')
        log.write('# ' + msg + "\n")
        print msg
        log.close()

    def read(self):
        if not os.path.exists(self.current_log_path):
            return '<no %s log>' % self.scm

        log = open(self.current_log_path)
        loglines = log.readlines()
        log.close()
        return loglines
++++++ svnfixtures.py ++++++
#!/usr/bin/python

import os

from   fixtures  import Fixtures
from   utils     import mkfreshdir, quietrun, run_svn

class SvnFixtures(Fixtures):
    def init(self):
        self.wd_path = self.container_dir + '/wd'

        self.create_repo()
        self.checkout_repo()

        self.added        = { }
        self.timestamps   = { }

        self.create_commits(2)

    def run(self, cmd):
        return run_svn(self.wd_path, cmd)

    def create_repo(self):
        quietrun('svnadmin create ' + self.repo_path)
        print "created repo", self.repo_path

    def checkout_repo(self):
        mkfreshdir(self.wd_path)
        quietrun('svn checkout %s %s' % (self.repo_url, self.wd_path))
        self.wd = self.wd_path

    def do_commit(self, newly_created):
        for new in newly_created:
            if not new in self.added:
                self.run('add ' + new)
                self.added[new] = True
        self.run('commit -m%d' % self.next_commit_rev)

    def get_metadata(self, formatstr):
        return self.run('log -n1' % formatstr)[0]

    def record_rev(self, rev_num):
        self.revs[rev_num] = str(rev_num)
        self.scmlogs.annotate("Recorded rev %d" % rev_num)
++++++ svntests.py ++++++
#!/usr/bin/python

from   commontests import CommonTests
from   svnfixtures import SvnFixtures
from   utils       import run_svn

class SvnTests(CommonTests):
    scm = 'svn'
    initial_clone_command = 'svn (co|checkout) '
    update_cache_command  = 'svn up(date)?'
    fixtures_class = SvnFixtures

    def default_version(self):
        return self.rev(2)

    def test_versionformat_rev(self):
        self.tar_scm_std('--versionformat', 'myrev%r.svn')
        self.assertTarOnly(self.basename(version = 'myrev2.svn'))

    def test_version_versionformat(self):
        self.tar_scm_std('--version', '3.0', '--versionformat', 'myrev%r.svn')
        self.assertTarOnly(self.basename(version = 'myrev2.svn'))

    def test_versionformat_revision(self):
        self.fixtures.create_commits(4)
        self.tar_scm_std('--versionformat', 'foo%r', '--revision', self.rev(2))
        basename = self.basename(version = 'foo2')
        th = self.assertTarOnly(basename)
        self.assertTarMemberContains(th, basename + '/a', '2')
++++++ tar_scm ++++++
++++ 704 lines (skipped)
++++ between /work/SRC/openSUSE:Factory/obs-service-tar_scm/tar_scm
++++ and /work/SRC/openSUSE:Factory/.obs-service-tar_scm.new/tar_scm

++++++ tar_scm.rc ++++++
#
# Define a cache directory here to avoid repeating downloads of the
# same file. This works on the server and on the client side.
#
# Note that this file can be overridden with per-user config placed
# in ~/.obs/tar_scm
#
# WARNING: you need to create three directories inside, when changing from 
default:
#          mkdir -p repo{,url} incoming
#
#CACHEDIRECTORY="/var/cache/obs/tar_scm"
++++++ tar_scm.service ++++++
--- /var/tmp/diff_new_pack.G30PbZ/_old  2012-02-17 12:18:42.000000000 +0100
+++ /var/tmp/diff_new_pack.G30PbZ/_new  2012-02-17 12:18:42.000000000 +0100
@@ -16,6 +16,12 @@
   <parameter name="subdir">
     <description>package just a sub directory</description>
   </parameter>
+  <parameter name="version">
+    <description>Specify version to be used in tarball. Defaults to 
automatically detected value formatted by versionformat parameter. If using 
keep-source, you should set this to a fixed constant which will be used to name 
the checked out directory.</description>
+  </parameter>
+  <parameter name="versionformat">
+    <description>Auto-generate version from checked out source using this 
format string. For git, value is passed via git show --pretty=format:... 
(default '%at'); for hg, via hg log --template=... (default '{rev}'); for bzr 
and svn, %r is revision (default '%r'). Overrides tarball name defined by 
version parameter.</description>
+  </parameter>
   <parameter name="versionprefix">
     <description>specify a base version as prefix.</description>
   </parameter>
@@ -23,13 +29,13 @@
     <description>specify a revision</description>
   </parameter>
   <parameter name="filename">
-    <description>base file name to be created</description>
+    <description>name of package - used together with version to determine 
tarball name</description>
   </parameter>
   <parameter name="exclude">
-    <description>for sepcifing excludes when creating the tar 
ball</description>
+    <description>for specifying excludes when creating the tar 
ball</description>
   </parameter>
-  <parameter name="version">
-    <description>version to be used in tar. Setting it to an empty string is 
disabling the version tag.</description>
+  <parameter name="include">
+    <description>for specifying subset of files/subdirectories to pack in the 
tar ball</description>
   </parameter>
   <parameter name="package-meta">
     <description>Package the meta data of SCM to allow the user or OBS to 
update after un-tar</description>

++++++ test.py ++++++
#!/usr/bin/python

import sys
import unittest

from gittests import GitTests
from svntests import SvnTests
from hgtests  import HgTests
from bzrtests import BzrTests

if __name__ == '__main__':
    suite = unittest.TestSuite()
    testclasses = [
        SvnTests,
        GitTests,
        HgTests,
        BzrTests,
    ]
    for testclass in testclasses:
        suite.addTests(unittest.TestLoader().loadTestsFromTestCase(testclass))

    runner_args = {
        #'verbosity' : 2,
    }
    major, minor, micro, releaselevel, serial = sys.version_info
    if major > 2 or (major == 2 and minor >= 7):
        # New in 2.7
        runner_args['buffer'] = True
        #runner_args['failfast'] = True

    runner = unittest.TextTestRunner(**runner_args)
    result = runner.run(suite)

    if result.wasSuccessful():
        sys.exit(0)
    else:
        sys.exit(1)
++++++ testassertions.py ++++++
#!/usr/bin/python

import os
from pprint import pprint, pformat
import re
import tarfile
import unittest

line_start = '(^|\n)'

class TestAssertions(unittest.TestCase):
    ######################################################################
    # backported from 2.7 just in case we're running on an older Python
    def assertRegexpMatches(self, text, expected_regexp, msg=None):
        """Fail the test unless the text matches the regular expression."""
        if isinstance(expected_regexp, basestring):
            expected_regexp = re.compile(expected_regexp)
        if not expected_regexp.search(text):
            msg = msg or "Regexp didn't match"
            msg = '%s: %r not found in %r' % (msg, expected_regexp.pattern, 
text)
            raise self.failureException(msg)

    def assertNotRegexpMatches(self, text, unexpected_regexp, msg=None):
        """Fail the test if the text matches the regular expression."""
        if isinstance(unexpected_regexp, basestring):
            unexpected_regexp = re.compile(unexpected_regexp)
        match = unexpected_regexp.search(text)
        if match:
            msg = msg or "Regexp matched"
            msg = '%s: %r matches %r in %r' % (msg,
                                               text[match.start():match.end()],
                                               unexpected_regexp.pattern,
                                               text)
            raise self.failureException(msg)
    ######################################################################

    def assertNumDirents(self, dir, expected, msg = ''):
        dirents = os.listdir(dir)
        got = len(dirents)
        if len(msg) > 0: msg += "\n"
        msg += 'expected %d file(s), got %d: %s' % (expected, got, 
pformat(dirents))
        self.assertEqual(expected, got, msg)
        return dirents

    def assertNumTarEnts(self, tar, expected, msg = ''):
        self.assertTrue(tarfile.is_tarfile(tar))
        th = tarfile.open(tar)
        tarents = th.getmembers()
        got = len(tarents)
        if len(msg) > 0: msg += "\n"
        msg += 'expected %s to have %d entries, got %d:\n%s' % \
            (tar, expected, got, pformat(tarents))
        self.assertEqual(expected, got, msg)
        return th, tarents

    def assertStandardTar(self, tar, top):
        th, entries = self.assertNumTarEnts(tar, 4)
        entries.sort(lambda x, y: cmp(x.name, y.name))
        self.assertEqual(entries[0].name, top)
        self.assertEqual(entries[1].name, top + '/a')
        self.assertEqual(entries[2].name, top + '/subdir')
        self.assertEqual(entries[3].name, top + '/subdir/b')
        return th

    def assertSubdirTar(self, tar, top):
        th, entries = self.assertNumTarEnts(tar, 2)
        entries.sort(lambda x, y: cmp(x.name, y.name))
        self.assertEqual(entries[0].name, top)
        self.assertEqual(entries[1].name, top + '/b')
        return th

    def checkTar(self, tar, tarbasename, toptardir=None, tarchecker=None):
        if not toptardir:
            toptardir = tarbasename
        if not tarchecker:
            tarchecker = self.assertStandardTar

        self.assertEqual(tar, '%s.tar' % tarbasename)
        tarpath = os.path.join(self.outdir, tar)
        return tarchecker(tarpath, toptardir)

    def assertTarOnly(self, tarbasename, **kwargs):
        dirents = self.assertNumDirents(self.outdir, 1)
        return self.checkTar(dirents[0], tarbasename, **kwargs)

    def assertTarAndDir(self, tarbasename, dirname=None, **kwargs):
        if not dirname:
            dirname = tarbasename

        dirents = self.assertNumDirents(self.outdir, 2)
        pprint(dirents)

        if dirents[0][-4:] == '.tar':
            tar = dirents[0]
            wd  = dirents[1]
        elif dirents[1][-4:] == '.tar':
            tar = dirents[1]
            wd  = dirents[0]
        else:
            self.fail('no .tar found in ' + self.outdir)

        self.assertEqual(wd, dirname)
        self.assertTrue(os.path.isdir(os.path.join(self.outdir, wd)),
                        dirname + ' should be directory')

        return self.checkTar(tar, tarbasename, **kwargs)

    def assertTarMemberContains(self, th, tarmember, contents):
        f = th.extractfile(tarmember)
        self.assertEqual(contents, "\n".join(f.readlines()))

    def assertRanInitialClone(self, logpath, loglines):
        self._find(logpath, loglines, self.initial_clone_command, 
self.update_cache_command)

    def assertRanUpdate(self, logpath, loglines):
        self._find(logpath, loglines, self.update_cache_command, 
self.initial_clone_command)

    def _find(self, logpath, loglines, should_find, should_not_find):
        print "####", should_find
        found = False
        regexp = re.compile('^' + should_find)
        for line in loglines:
            msg = \
                "Shouldn't find /%s/ in %s; log was:\n" \
                "----\n%s\n----\n" \
                % (should_not_find, logpath, "".join(loglines))
            self.assertNotRegexpMatches(line, should_not_find, msg)
            if regexp.search(line):
                found = True
        msg = \
            "Didn't find /%s/ in %s; log was:\n" \
            "----\n%s\n----\n" \
            % (regexp.pattern, logpath, "".join(loglines))
        self.assertTrue(found, msg)
++++++ testenv.py ++++++
#!/usr/bin/python

import os
import shutil
from utils     import mkfreshdir, run_cmd
from scmlogs   import ScmInvocationLogs

class TestEnvironment:
    tests_dir   = os.path.abspath(os.path.dirname(__file__)) # os.getcwd()
    tmp_dir     = tests_dir + '/tmp'
    is_setup    = False

    @classmethod
    def tar_scm_bin(cls):
        tar_scm = cls.tests_dir + '/tar_scm'
        if not os.path.isfile(tar_scm):
            raise RuntimeError, "Failed to find tar_scm executable at " + 
tar_scm
        return tar_scm

    @classmethod
    def setupClass(cls):
        # deliberately not setUpClass - we emulate the behaviour
        # to support Python < 2.7
        if cls.is_setup:
            return
        print "++++++ setupClass ++++++"
        ScmInvocationLogs.setup_bin_wrapper(cls.scm, cls.tmp_dir)
        os.putenv('DEBUG_TAR_SCM', 'yes')
        cls.is_setup = True

    def calcPaths(self):
        if not self._testMethodName.startswith('test_'):
            raise RuntimeError, "unexpected test method name: " + 
self._testMethodName
        self.test_name = self._testMethodName[5:]
        self.test_dir  = os.path.join(self.tmp_dir,  self.scm, self.test_name)
        self.pkgdir    = os.path.join(self.test_dir, 'pkg')
        self.outdir    = os.path.join(self.test_dir, 'out')
        self.cachedir  = os.path.join(self.test_dir, 'cache')

    def setUp(self):
        self.setupClass()
        print "++++++ setUp ++++++"

        self.calcPaths()

        self.scmlogs = ScmInvocationLogs(self.scm, self.test_dir)
        self.scmlogs.next('fixtures')

        self.initDirs()

        self.fixtures = self.fixtures_class(self.test_dir, self.scmlogs)
        self.fixtures.setup()

        self.scmlogs.next('start-test')
        self.scmlogs.annotate('Starting %s test' % self.test_name)

        os.putenv('CACHEDIRECTORY', self.cachedir)
        # osc launches source services with cwd as pkg dir
        os.chdir(self.pkgdir)

    def initDirs(self):
        # pkgdir persists between tests to simulate real world use
        # (although a test can choose to invoke mkfreshdir)
        if not os.path.exists(self.pkgdir):
            os.makedirs(self.pkgdir)

        for subdir in ('repo', 'repourl', 'incoming'):
            mkfreshdir(os.path.join(self.cachedir, subdir))

    def disableCache(self):
        os.unsetenv('CACHEDIRECTORY')

    def tearDown(self):
        print "++++++ tearDown ++++++"
        self.postRun()

    def postRun(self):
        print "++++++ postRun +++++++"
        self.service = { 'mode' : 'disabled' }
        if os.path.exists(self.outdir):
            self.simulate_osc_postrun()

    def simulate_osc_postrun(self):
        """
        Simulate osc copying files from temporary --outdir back to
        package source directory, so our tests can catch any
        potential side-effects due to the persistent nature of the
        package source directory.
        """

        temp_dir = self.outdir
        dir = self.pkgdir
        service = self.service

        # This code copied straight out of osc/core.py Serviceinfo.execute():

        if service['mode'] == "disabled" or service['mode'] == "trylocal" or 
service['mode'] == "localonly" or callmode == "local" or callmode == "trylocal":
            for filename in os.listdir(temp_dir):
                shutil.move( os.path.join(temp_dir, filename), 
os.path.join(dir, filename) )
        else:
            for filename in os.listdir(temp_dir):
                shutil.move( os.path.join(temp_dir, filename), 
os.path.join(dir, "_service:"+name+":"+filename) )

    def tar_scm_std(self, *args, **kwargs):
        return self.tar_scm(self.stdargs(*args), **kwargs)

    def tar_scm_std_fail(self, *args):
        return self.tar_scm(self.stdargs(*args), should_succeed=False)

    def stdargs(self, *args):
        return [ '--url', self.fixtures.repo_url, '--scm', self.scm ] + 
list(args)

    def tar_scm(self, args, should_succeed=True):
        # simulate new temporary outdir for each tar_scm invocation
        mkfreshdir(self.outdir)
        cmdargs = args + [ '--outdir', self.outdir ]
        quotedargs = [ "'%s'" % arg for arg in cmdargs ]
        cmdstr = 'bash %s %s 2>&1' % (self.tar_scm_bin(), " ".join(quotedargs))
        print "\n"
        print "-" * 70
        print "Running", cmdstr
        (stdout, stderr, ret) = run_cmd(cmdstr)
        if stdout:
            print "STDOUT:"
            print "------"
            print stdout,
        if stderr:
            print "STDERR:"
            print "------"
            print stderr,
        print "-" * 70
        succeeded = ret == 0
        self.assertEqual(succeeded, should_succeed)
        return (stdout, stderr, ret)

    def rev(self, rev):
        return self.fixtures.revs[rev]

    def timestamps(self, rev):
        return self.fixtures.timestamps[rev]

    def sha1s(self, rev):
        return self.fixtures.sha1s[rev]

++++++ utils.py ++++++
#!/usr/bin/python

import os
import re
import shutil
import subprocess

def mkfreshdir(path):
    if not re.search('.{10}/tmp(/|$)', path):
        raise RuntimeError, 'unsafe call: mkfreshdir(%s)' % path

    cwd = os.getcwd()
    os.chdir('/')
    if os.path.exists(path):
        shutil.rmtree(path)
    os.makedirs(path)

def run_cmd(cmd):
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
    (stdout, stderr) = p.communicate()
    return (stdout, stderr, p.returncode)

def quietrun(cmd):
    (stdout, stderr, ret) = run_cmd(cmd)
    if ret != 0:
        print cmd, " failed!"
        print stdout
        print stderr
    return (stdout, stderr, ret)

def run_scm(scm, repo, opts):
    cmd = 'cd %s && %s %s' % (repo, scm, opts)
    #return subprocess.check_output(cmd, shell=True)
    return quietrun(cmd)

def run_git(repo, opts):
    return run_scm('git', repo, opts)

def run_svn(repo, opts):
    return run_scm('svn', repo, opts)

def run_hg(repo, opts):
    return run_scm('hg',  repo, opts)

def run_bzr(repo, opts):
    return run_scm('bzr', repo, opts)
-- 
To unsubscribe, e-mail: opensuse-commit+unsubscr...@opensuse.org
For additional commands, e-mail: opensuse-commit+h...@opensuse.org

Reply via email to