Use the new _alpm_for_each_cpu() API to perform integrity checks in
parallel.  This speeds up integrity checks drastically on multi-core
machines.
---
 lib/libalpm/sync.c |  186 +++++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 141 insertions(+), 45 deletions(-)

diff --git a/lib/libalpm/sync.c b/lib/libalpm/sync.c
index 859b8c9..52c87e1 100644
--- a/lib/libalpm/sync.c
+++ b/lib/libalpm/sync.c
@@ -32,6 +32,7 @@
 #include <unistd.h>
 #include <time.h>
 #include <limits.h>
+#include <pthread.h>
 
 /* libalpm */
 #include "sync.h"
@@ -687,6 +688,126 @@ static int test_md5sum(pmtrans_t *trans, const char 
*filename,
        return(ret);
 }
 
+typedef struct {
+       pmtrans_t *trans;
+       alpm_list_t *deltas;
+       alpm_list_t **data;
+       pthread_mutex_t *mutex;
+} _alpm_delta_integrity_payload;
+
+static int _alpm_check_delta_integrity(void *ptr, int thread, int numcpus)
+{
+       _alpm_delta_integrity_payload *payload = ptr;
+       pmtrans_t *trans = payload->trans;
+       alpm_list_t *deltas = payload->deltas;
+       alpm_list_t **data = payload->data;
+       pthread_mutex_t *mutex = payload->mutex;
+       size_t numdeltas = alpm_list_count(deltas);
+
+       alpm_list_t *i;
+       size_t count = 0, range = (numdeltas * thread) / numcpus;
+       for (i = deltas; count < range; i = i->next, count++);
+
+       int errors = 0;
+       range = (numdeltas * (thread + 1)) / numcpus;
+       pthread_mutex_lock(mutex);
+       for(i = deltas; i; i = i->next) {
+               pmdelta_t *d = alpm_list_getdata(i);
+               const char *filename = alpm_delta_get_filename(d);
+               const char *md5sum = alpm_delta_get_md5sum(d);
+
+               /* Calculate md5sums in parallel */
+               pthread_mutex_unlock(mutex);
+               int test = test_md5sum(trans, filename, md5sum);
+               pthread_mutex_lock(mutex);
+
+               if(test != 0) {
+                       errors++;
+                       *data = alpm_list_add(*data, strdup(filename));
+               }
+       }
+       pthread_mutex_unlock(mutex);
+
+       return errors;
+}
+
+typedef struct {
+       pmtrans_t *trans;
+       alpm_list_t **data;
+       size_t numtargs;
+       size_t *current;
+       pthread_mutex_t *mutex;
+} _alpm_integrity_payload;
+
+static int _alpm_check_integrity(void *ptr, int thread, int numcpus)
+{
+       _alpm_integrity_payload *payload = ptr;
+       pmtrans_t *trans = payload->trans;
+       alpm_list_t **data = payload->data;
+       size_t numtargs = payload->numtargs;
+       size_t *current = payload->current;
+       pthread_mutex_t *mutex = payload->mutex;
+
+       alpm_list_t *i;
+       size_t count = 0, range = (numtargs * thread) / numcpus;
+       for (i = trans->add; count < range; i = i->next, count++);
+
+       int errors = 0;
+       range = (numtargs * (thread + 1)) / numcpus;
+       pthread_mutex_lock(mutex);
+       for(; count < range; i = i->next, count++) {
+               pmpkg_t *spkg = i->data;
+               if(spkg->origin == PKG_FROM_FILE) {
+                       continue; /* pkg_load() has been already called, this 
package is valid */
+               }
+
+               const char *filename = alpm_pkg_get_filename(spkg);
+               const char *md5sum = alpm_pkg_get_md5sum(spkg);
+
+               /* Calculate md5sums in parallel */
+               pthread_mutex_unlock(mutex);
+               int test = test_md5sum(trans, filename, md5sum);
+               pthread_mutex_lock(mutex);
+
+               if(test != 0) {
+                       errors++;
+                       *data = alpm_list_add(*data, strdup(filename));
+                       goto next;
+               }
+               /* load the package file and replace pkgcache entry with it in 
the target list */
+               /* TODO: alpm_pkg_get_db() will not work on this target anymore 
*/
+               _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package 
file for target %s\n", spkg->name);
+               char *filepath = _alpm_filecache_find(filename);
+               pmpkg_t *pkgfile;
+
+               /* Load packages in parallel */
+               pthread_mutex_unlock(mutex);
+               int loaded = alpm_pkg_load(filepath, 1, &pkgfile);
+               pthread_mutex_lock(mutex);
+
+               if(loaded != 0) {
+                       _alpm_pkg_free(pkgfile);
+                       errors++;
+                       *data = alpm_list_add(*data, strdup(filename));
+                       FREE(filepath);
+                       goto next;
+               }
+               FREE(filepath);
+               pkgfile->reason = spkg->reason; /* copy over install reason */
+               i->data = pkgfile;
+               _alpm_pkg_free_trans(spkg); /* spkg has been removed from the 
target list */
+
+       next:
+               ++*current;
+               int percent = (*current * 100) / numtargs;
+               PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
+                               numtargs, *current);
+       }
+       pthread_mutex_unlock(mutex);
+
+       return errors;
+}
+
 int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
 {
        alpm_list_t *i, *j, *files = NULL;
@@ -786,25 +907,25 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, 
alpm_list_t **data)
        /* if we have deltas to work with */
        if(handle->usedelta && deltas) {
                int ret = 0;
-               errors = 0;
+
                /* Check integrity of deltas */
                EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_START, NULL, NULL);
 
-               for(i = deltas; i; i = i->next) {
-                       pmdelta_t *d = alpm_list_getdata(i);
-                       const char *filename = alpm_delta_get_filename(d);
-                       const char *md5sum = alpm_delta_get_md5sum(d);
+               static pthread_mutex_t delta_integrity_mutex = 
PTHREAD_MUTEX_INITIALIZER;
+               _alpm_delta_integrity_payload payload = {
+                       .trans  = trans,
+                       .deltas = deltas,
+                       .data   = data,
+                       .mutex  = &delta_integrity_mutex
+               };
+
+               errors = _alpm_for_each_cpu(_alpm_check_delta_integrity, 
&payload);
+               EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL);
 
-                       if(test_md5sum(trans, filename, md5sum) != 0) {
-                               errors++;
-                               *data = alpm_list_add(*data, strdup(filename));
-                       }
-               }
                if(errors) {
                        pm_errno = PM_ERR_DLT_INVALID;
                        goto error;
                }
-               EVENT(trans, PM_TRANS_EVT_DELTA_INTEGRITY_DONE, NULL, NULL);
 
                /* Use the deltas to generate the packages */
                EVENT(trans, PM_TRANS_EVT_DELTA_PATCHES_START, NULL, NULL);
@@ -821,41 +942,16 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, 
alpm_list_t **data)
        numtargs = alpm_list_count(trans->add);
        EVENT(trans, PM_TRANS_EVT_INTEGRITY_START, NULL, NULL);
 
-       errors = 0;
-       for(i = trans->add; i; i = i->next, current++) {
-               pmpkg_t *spkg = i->data;
-               int percent = (current * 100) / numtargs;
-               if(spkg->origin == PKG_FROM_FILE) {
-                       continue; /* pkg_load() has been already called, this 
package is valid */
-               }
-               PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
-                               numtargs, current);
-
-               const char *filename = alpm_pkg_get_filename(spkg);
-               const char *md5sum = alpm_pkg_get_md5sum(spkg);
+       static pthread_mutex_t integrity_mutex = PTHREAD_MUTEX_INITIALIZER;
+       _alpm_integrity_payload payload = {
+               .trans    = trans,
+               .data     = data,
+               .numtargs = numtargs,
+               .current  = &current,
+               .mutex    = &integrity_mutex
+       };
+       errors = _alpm_for_each_cpu(_alpm_check_integrity, &payload);
 
-               if(test_md5sum(trans, filename, md5sum) != 0) {
-                       errors++;
-                       *data = alpm_list_add(*data, strdup(filename));
-                       continue;
-               }
-               /* load the package file and replace pkgcache entry with it in 
the target list */
-               /* TODO: alpm_pkg_get_db() will not work on this target anymore 
*/
-               _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package 
file for target %s\n", spkg->name);
-               char *filepath = _alpm_filecache_find(filename);
-               pmpkg_t *pkgfile;
-               if(alpm_pkg_load(filepath, 1, &pkgfile) != 0) {
-                       _alpm_pkg_free(pkgfile);
-                       errors++;
-                       *data = alpm_list_add(*data, strdup(filename));
-                       FREE(filepath);
-                       continue;
-               }
-               FREE(filepath);
-               pkgfile->reason = spkg->reason; /* copy over install reason */
-               i->data = pkgfile;
-               _alpm_pkg_free_trans(spkg); /* spkg has been removed from the 
target list */
-       }
        PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", 100,
                        numtargs, current);
        EVENT(trans, PM_TRANS_EVT_INTEGRITY_DONE, NULL, NULL);
-- 
1.7.4.1


Reply via email to