Here is a completely "fixed" version of your program (I hope).
By using semaphores, Thrcheck can reliably see all synchronisation
events. It should give the same results on all runs.
The false positives in Jump::run exist for the following reason.
1. You create NUM_THREADS Loop jobs. These access the counter
array, some with shared locks, some without.
2. You call wait_for_jobs(). This is like a barrier: all the
Loop jobs must finish before it returns.
3. You create NUM_THREADS Jump jobs. These modify the counter
array again.
Unfortunately, in (3), Thrcheck has no way to know that the Jump
jobs are logically unrelated to the Loop jobs that happened before.
So (for example) vec[0].ptr is accessed by multiple threads in the
Loop jobs. And so Thrcheck expects that it should continue to be
protected by a lock in the Jump jobs, but it is not.
This is a classic problem with thread checkers: "memory recycling".
My solution is a standard one. You must tell Thrcheck, after the
Loop jobs have finished, to "forget" what it knows about
vec[..].counter. This is done using the calls to VALGRIND_TC_CLEAN_MEMORY.
One final detail. If you write the program in a more simple way
then Thrcheck can give correct results without using VALGRIND_TC_CLEAN_MEMORY.
The more simple way is: create a new worker thread for each task, then
wait for it to exit. Thrcheck understands that when a thread joins with
its parent, then the parent "inherits" ownership of the child's memory
locations. So in this case, after all the threads for the Loop jobs
had exited, then the whole memory state would be clean, and ready for
you to start new threads for the Jump jobs.
I hope that makes some sense. Thanks again for the feedback.
J
----------------------------------------------------------------------
g++ -g -O -o thread thread.C -lpthread -I$prefix/include
#include <iostream>
#include <vector>
extern "C" {
#include <pthread.h>
#include <semaphore.h>
#include <assert.h>
#include "valgrind/thrcheck.h"
}
pthread_mutex_t odd_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t even_mutex = PTHREAD_MUTEX_INITIALIZER;
static int const NUM_THREADS = 8;
class Job {
public:
virtual void run() = 0;
};
/* Used to tell workers to look in the work[] array. */
sem_t jobs_available;
/* Used to tell the controller that a job is done. */
sem_t jobs_completed;
/* Work array. Must be locked using this lock whenever anyone
accesses it. */
pthread_mutex_t work_mutex = PTHREAD_MUTEX_INITIALIZER;
Job* work[NUM_THREADS] = {NULL, NULL};
extern "C" void * start(void *) {
while (true) {
/* First, wait for the controller to tell me there is some
work available. */
int r = sem_wait( &jobs_available ); assert(r==0);
/* Now scan the work[] array to find out what I should do. */
pthread_mutex_lock(&work_mutex);
Job * job = NULL;
for (int i = 0; i < NUM_THREADS; ++i) {
if (work[i] != NULL) {
job = work[i];
work[i] = NULL;
break;
}
}
pthread_mutex_unlock(&work_mutex);
/* If there is no work found, I should quit. */
if (job == NULL)
return NULL;
/* else do it, then post to the controller that I am done. */
job->run();
r = sem_post(&jobs_completed); assert(r==0);
}
/*NOTREACHED*/
}
struct Data {
Data() : counter(0) {}
int counter;
};
class Loop : public Job {
public:
Loop(std::vector<Data *> & vec) : vec(vec) {}
virtual void run() {
for (std::size_t i = 0; i < vec.size(); ++i) {
if (i % 2 == 0) {
pthread_mutex_lock(&even_mutex);
vec[i]->counter++;
pthread_mutex_unlock(&even_mutex);
} else {
// pthread_mutex_lock(&odd_mutex);
vec[i]->counter++;
// pthread_mutex_unlock(&odd_mutex);
}
}
}
private:
std::vector<Data *> & vec;
};
class Jump : public Job {
public:
Jump(std::vector<Data *> & vec, std::size_t my_indices)
: vec(vec), my_indices(my_indices) {}
virtual void run() {
for (std::size_t i = 0; i < vec.size(); ++i) {
if (i % NUM_THREADS == my_indices) {
vec[i]->counter++;
}
}
}
private:
std::vector<Data *> & vec;
std::size_t my_indices;
};
void wait_for_jobs(int num) {
int i, r;
for (i = 0; i < num; i++) {
std::cout << "waitf_j " << i << std::endl;
r = sem_wait( &jobs_completed ); assert(r==0);
}
}
void start_jobs(int num) {
int i, r;
for (i = 0; i < num; i++) {
std::cout << "start_j " << i << std::endl;
r = sem_post( &jobs_available ); assert(r==0);
}
}
int main() {
int r;
int* counter_ptrs[NUM_THREADS];
r = sem_init( &jobs_available, 0/*pshared*/, 0/*initial_value*/ );
assert(r==0);
r = sem_init( &jobs_completed, 0/*pshared*/, 0/*initial_value*/ );
assert(r==0);
std::cout << "Starting main programm" << std::endl;
pthread_t tid[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; ++i) {
pthread_create(&tid[i], NULL, start, NULL);
}
std::cout << "Threads created" << std::endl;
std::vector<Data *> vec;
for (std::size_t i = 0; i < NUM_THREADS; ++i) {
vec.push_back(new Data());
std::cout << "Created counter at address: "
<< &vec.back()->counter << std::endl;
counter_ptrs[i] = &vec.back()->counter;
}
std::cout << "Vector initialized" << std::endl;
Loop l(vec);
pthread_mutex_lock(&work_mutex);
for (std::size_t i = 0; i < NUM_THREADS; ++i) {
work[i] = &l;
}
pthread_mutex_unlock(&work_mutex);
std::cout << "Added loop jobs" << std::endl;
start_jobs(NUM_THREADS);
wait_for_jobs(NUM_THREADS);
/* Create new jobs and clean up the memory used by the old jobs. */
std::vector<Jump *> jump_vec;
for (std::size_t i = 0; i < NUM_THREADS; ++i) {
jump_vec.push_back(new Jump(vec, i));
VALGRIND_TC_CLEAN_MEMORY( counter_ptrs[i], sizeof(int) );
}
pthread_mutex_lock(&work_mutex);
for (std::size_t i = 0; i < NUM_THREADS; ++i) {
assert(work[i] == NULL);
work[i] = jump_vec[i];
}
pthread_mutex_unlock(&work_mutex);
std::cout << "Added jump jobs" << std::endl;
start_jobs(NUM_THREADS);
wait_for_jobs(NUM_THREADS);
/* Cause all the threads to exit, by ensuring work[] is completely
NULL (which it should already be), then doing start_jobs. All
threads will fail to find any work, and exit. */
pthread_mutex_lock(&work_mutex);
for (std::size_t i = 0; i < NUM_THREADS; ++i) {
assert(work[i] == NULL);
}
pthread_mutex_unlock(&work_mutex);
start_jobs(NUM_THREADS);
/* And collect up all the threads. */
for (std::size_t i = 0; i < NUM_THREADS; ++i) {
pthread_join(tid[i], NULL);
}
}
-------------------------------------------------------------------------
This SF.net email is sponsored by: Splunk Inc.
Still grepping through log files to find problems? Stop.
Now Search log events and configuration files using AJAX and a browser.
Download your FREE copy of Splunk now >> http://get.splunk.com/
_______________________________________________
Valgrind-developers mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/valgrind-developers