On 19 February 2011 06:28, Nezmer <[email protected]> wrote:
> Actually, The sysconf() method  works at least in FreeBSD and the man page
> says the sysconf interface is defined by POSIX.1

The sysconf() interface is specified by POSIX.1, but
_SC_NPROCESSORS_ONLN is a non-standard extension.

>> You can look at how x264 guys implemented this. Check out
>> x264_cpu_num_processors() in common/cpu.c in their source tree.

Seems to be basically a more platform-complete version of what I'm
doing here [1].  But yeah, we'd need something like that if pacman is
supposed to run on everything unix-based.  I don't imagine we care
much about Windows compatibility :)

>> I'm not sure hard-coding supported platforms through ifdefs would be
>> accepted in pacman/libalpm.

There's code that does this already in lib/libalpm/diskspace.c, for
example.  It's not like this would limit the supported platforms
either; we can just pretend there's only 1 core when we don't know how
to count them.

Anyway, I've made a quick and dirty patch as a proof of concept.  It
drops the time spent doing integrity checks from 9 seconds to 3
seconds on an 88MB update.  The improvements I plan to maker are to
hide some of the threading behind an _alpm_for_each_cpu() API, use it
for delta integrity checks too, and divide the packages between the
cores evenly in terms of package size rather than package count.  And
of course add the missing error checks.

diff --git a/lib/libalpm/sync.c b/lib/libalpm/sync.c
index 859b8c9..595abd9 100644
--- a/lib/libalpm/sync.c
+++ b/lib/libalpm/sync.c
@@ -32,6 +32,7 @@
 #include <unistd.h>
 #include <time.h>
 #include <limits.h>
+#include <pthread.h>

 /* libalpm */
 #include "sync.h"
@@ -687,6 +688,79 @@ static int test_md5sum(pmtrans_t *trans, const
char *filename,
        return(ret);
 }

+/* Thread payload for checking package integrity */
+typedef struct {
+       size_t thread, numcpus, numtargs;
+       volatile size_t *current;
+       pmtrans_t *trans;
+       int *errors;
+       alpm_list_t **data;
+
+       pthread_mutex_t *mutex;
+} _alpm_integrity_payload;
+
+static void *_alpm_integrity_check_thread(void *ptr)
+{
+       _alpm_integrity_payload *payload = ptr;
+       pmtrans_t *trans = payload->trans;
+
+       alpm_list_t *i;
+       size_t count = 0, range = (payload->numtargs * payload->thread) /
payload->numcpus;
+       for(i = trans->add; count < range; i = i->next, count++);
+
+       range = (payload->numtargs * (payload->thread + 1)) / payload->numcpus;
+       pthread_mutex_lock(payload->mutex);
+       for(; count < range; i = i->next, count++) {
+               pmpkg_t *spkg = i->data;
+               if(spkg->origin == PKG_FROM_FILE) {
+                       continue; /* pkg_load() has been already called, this 
package is valid */
+               }
+
+               const char *filename = alpm_pkg_get_filename(spkg);
+               const char *md5sum = alpm_pkg_get_md5sum(spkg);
+
+               pthread_mutex_unlock(payload->mutex);
+               int test = test_md5sum(trans, filename, md5sum);
+               pthread_mutex_lock(payload->mutex);
+
+               if (test != 0) {
+                       (*payload->errors)++;
+                       *payload->data = alpm_list_add(*payload->data, 
strdup(filename));
+                       goto next;
+               }
+               /* load the package file and replace pkgcache entry with it in 
the
target list */
+               /* TODO: alpm_pkg_get_db() will not work on this target anymore 
*/
+               _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package 
file
for target %s\n", spkg->name);
+               char *filepath = _alpm_filecache_find(filename);
+               pmpkg_t *pkgfile;
+
+               pthread_mutex_unlock(payload->mutex);
+               int loaded = alpm_pkg_load(filepath, 1, &pkgfile);
+               pthread_mutex_lock(payload->mutex);
+
+               if(loaded != 0) {
+                       _alpm_pkg_free(pkgfile);
+                       (*payload->errors)++;
+                       *payload->data = alpm_list_add(*payload->data, 
strdup(filename));
+                       FREE(filepath);
+                       goto next;
+               }
+               FREE(filepath);
+               pkgfile->reason = spkg->reason; /* copy over install reason */
+               i->data = pkgfile;
+               _alpm_pkg_free_trans(spkg); /* spkg has been removed from the 
target list */
+
+       next:
+               (*payload->current)++;
+               int percent = (*payload->current * 100) / payload->numtargs;
+               PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
+                               payload->numtargs, *payload->current);
+       }
+       pthread_mutex_unlock(payload->mutex);
+
+       return(NULL);
+}
+
 int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
 {
        alpm_list_t *i, *j, *files = NULL;
@@ -821,41 +895,36 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t
*db_local, alpm_list_t **data)
        numtargs = alpm_list_count(trans->add);
        EVENT(trans, PM_TRANS_EVT_INTEGRITY_START, NULL, NULL);

-       errors = 0;
-       for(i = trans->add; i; i = i->next, current++) {
-               pmpkg_t *spkg = i->data;
-               int percent = (current * 100) / numtargs;
-               if(spkg->origin == PKG_FROM_FILE) {
-                       continue; /* pkg_load() has been already called, this 
package is valid */
-               }
-               PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
-                               numtargs, current);
+       long numcpus = sysconf(_SC_NPROCESSORS_ONLN);
+       if(numcpus < 1) {
+               numcpus = 1;
+       }

-               const char *filename = alpm_pkg_get_filename(spkg);
-               const char *md5sum = alpm_pkg_get_md5sum(spkg);
+       static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+       _alpm_integrity_payload payload = {
+               .numcpus  = numcpus,
+               .numtargs = numtargs,
+               .current  = &current,
+               .trans    = trans,
+               .errors   = &errors,
+               .data     = data,
+               .mutex    = &mutex
+       };

-               if(test_md5sum(trans, filename, md5sum) != 0) {
-                       errors++;
-                       *data = alpm_list_add(*data, strdup(filename));
-                       continue;
+       errors = 0;
+       {
+               pthread_t threads[numcpus];
+               _alpm_integrity_payload payloads[numcpus];
+               for(int i = 0; i < numcpus; i++) {
+                       payloads[i] = payload;
+                       payloads[i].thread = i;
+                       pthread_create(&threads[i], NULL, 
_alpm_integrity_check_thread,
&payloads[i]);
                }
-               /* load the package file and replace pkgcache entry with it in 
the
target list */
-               /* TODO: alpm_pkg_get_db() will not work on this target anymore 
*/
-               _alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package 
file
for target %s\n", spkg->name);
-               char *filepath = _alpm_filecache_find(filename);
-               pmpkg_t *pkgfile;
-               if(alpm_pkg_load(filepath, 1, &pkgfile) != 0) {
-                       _alpm_pkg_free(pkgfile);
-                       errors++;
-                       *data = alpm_list_add(*data, strdup(filename));
-                       FREE(filepath);
-                       continue;
+               for(int i = 0; i < numcpus; i++) {
+                       pthread_join(threads[i], NULL);
                }
-               FREE(filepath);
-               pkgfile->reason = spkg->reason; /* copy over install reason */
-               i->data = pkgfile;
-               _alpm_pkg_free_trans(spkg); /* spkg has been removed from the 
target list */
        }
+
        PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", 100,
                        numtargs, current);
        EVENT(trans, PM_TRANS_EVT_INTEGRITY_DONE, NULL, NULL);

[1]: 
http://gitorious.org/dimension/dimension/blobs/master/libdimension/platform.c#line82

-- 
Tavian Barnes
diff --git a/lib/libalpm/sync.c b/lib/libalpm/sync.c
index 859b8c9..595abd9 100644
--- a/lib/libalpm/sync.c
+++ b/lib/libalpm/sync.c
@@ -32,6 +32,7 @@
 #include <unistd.h>
 #include <time.h>
 #include <limits.h>
+#include <pthread.h>
 
 /* libalpm */
 #include "sync.h"
@@ -687,6 +688,79 @@ static int test_md5sum(pmtrans_t *trans, const char *filename,
 	return(ret);
 }
 
+/* Thread payload for checking package integrity */
+typedef struct {
+	size_t thread, numcpus, numtargs;
+	volatile size_t *current;
+	pmtrans_t *trans;
+	int *errors;
+	alpm_list_t **data;
+
+	pthread_mutex_t *mutex;
+} _alpm_integrity_payload;
+
+static void *_alpm_integrity_check_thread(void *ptr)
+{
+	_alpm_integrity_payload *payload = ptr;
+	pmtrans_t *trans = payload->trans;
+
+	alpm_list_t *i;
+	size_t count = 0, range = (payload->numtargs * payload->thread) / payload->numcpus;
+	for(i = trans->add; count < range; i = i->next, count++);
+
+	range = (payload->numtargs * (payload->thread + 1)) / payload->numcpus;
+	pthread_mutex_lock(payload->mutex);
+	for(; count < range; i = i->next, count++) {
+		pmpkg_t *spkg = i->data;
+		if(spkg->origin == PKG_FROM_FILE) {
+			continue; /* pkg_load() has been already called, this package is valid */
+		}
+
+		const char *filename = alpm_pkg_get_filename(spkg);
+		const char *md5sum = alpm_pkg_get_md5sum(spkg);
+
+		pthread_mutex_unlock(payload->mutex);
+		int test = test_md5sum(trans, filename, md5sum);
+		pthread_mutex_lock(payload->mutex);
+
+		if (test != 0) {
+			(*payload->errors)++;
+			*payload->data = alpm_list_add(*payload->data, strdup(filename));
+			goto next;
+		}
+		/* load the package file and replace pkgcache entry with it in the target list */
+		/* TODO: alpm_pkg_get_db() will not work on this target anymore */
+		_alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name);
+		char *filepath = _alpm_filecache_find(filename);
+		pmpkg_t *pkgfile;
+
+		pthread_mutex_unlock(payload->mutex);
+		int loaded = alpm_pkg_load(filepath, 1, &pkgfile);
+		pthread_mutex_lock(payload->mutex);
+
+		if(loaded != 0) {
+			_alpm_pkg_free(pkgfile);
+			(*payload->errors)++;
+			*payload->data = alpm_list_add(*payload->data, strdup(filename));
+			FREE(filepath);
+			goto next;
+		}
+		FREE(filepath);
+		pkgfile->reason = spkg->reason; /* copy over install reason */
+		i->data = pkgfile;
+		_alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */
+
+	next:
+		(*payload->current)++;
+		int percent = (*payload->current * 100) / payload->numtargs;
+		PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
+				payload->numtargs, *payload->current);
+	}
+	pthread_mutex_unlock(payload->mutex);
+
+	return(NULL);
+}
+
 int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
 {
 	alpm_list_t *i, *j, *files = NULL;
@@ -821,41 +895,36 @@ int _alpm_sync_commit(pmtrans_t *trans, pmdb_t *db_local, alpm_list_t **data)
 	numtargs = alpm_list_count(trans->add);
 	EVENT(trans, PM_TRANS_EVT_INTEGRITY_START, NULL, NULL);
 
-	errors = 0;
-	for(i = trans->add; i; i = i->next, current++) {
-		pmpkg_t *spkg = i->data;
-		int percent = (current * 100) / numtargs;
-		if(spkg->origin == PKG_FROM_FILE) {
-			continue; /* pkg_load() has been already called, this package is valid */
-		}
-		PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", percent,
-				numtargs, current);
+	long numcpus = sysconf(_SC_NPROCESSORS_ONLN);
+	if(numcpus < 1) {
+		numcpus = 1;
+	}
 
-		const char *filename = alpm_pkg_get_filename(spkg);
-		const char *md5sum = alpm_pkg_get_md5sum(spkg);
+	static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+	_alpm_integrity_payload payload = {
+		.numcpus  = numcpus,
+		.numtargs = numtargs,
+		.current  = &current,
+		.trans	  = trans,
+		.errors	  = &errors,
+		.data	  = data,
+		.mutex	  = &mutex
+	};
 
-		if(test_md5sum(trans, filename, md5sum) != 0) {
-			errors++;
-			*data = alpm_list_add(*data, strdup(filename));
-			continue;
+	errors = 0;
+	{
+		pthread_t threads[numcpus];
+		_alpm_integrity_payload payloads[numcpus];
+		for(int i = 0; i < numcpus; i++) {
+			payloads[i] = payload;
+			payloads[i].thread = i;
+			pthread_create(&threads[i], NULL, _alpm_integrity_check_thread, &payloads[i]);
 		}
-		/* load the package file and replace pkgcache entry with it in the target list */
-		/* TODO: alpm_pkg_get_db() will not work on this target anymore */
-		_alpm_log(PM_LOG_DEBUG, "replacing pkgcache entry with package file for target %s\n", spkg->name);
-		char *filepath = _alpm_filecache_find(filename);
-		pmpkg_t *pkgfile;
-		if(alpm_pkg_load(filepath, 1, &pkgfile) != 0) {
-			_alpm_pkg_free(pkgfile);
-			errors++;
-			*data = alpm_list_add(*data, strdup(filename));
-			FREE(filepath);
-			continue;
+		for(int i = 0; i < numcpus; i++) {
+			pthread_join(threads[i], NULL);
 		}
-		FREE(filepath);
-		pkgfile->reason = spkg->reason; /* copy over install reason */
-		i->data = pkgfile;
-		_alpm_pkg_free_trans(spkg); /* spkg has been removed from the target list */
 	}
+
 	PROGRESS(trans, PM_TRANS_PROGRESS_INTEGRITY_START, "", 100,
 			numtargs, current);
 	EVENT(trans, PM_TRANS_EVT_INTEGRITY_DONE, NULL, NULL);


Reply via email to