I was working on a project for school that included implementing a
multithreaded version of the ComparingUpdateTracker class to try and improve
overall performance. There are a couple things I was unsure about. I'm not
sure how the THREADED flag should be set as well as the THREADPOOL_SIZE
constant. If you would like to provide recommendations I would be happy to
change it. Also, if there are any other changes that should be made I would
appreciate the guidance. If you would rather have the patch embedded instead
of attached just let me know. Thank you.

Rob King
Index: common/rfb/ComparingUpdateTracker.cxx
===================================================================
--- common/rfb/ComparingUpdateTracker.cxx	(revision 4663)
+++ common/rfb/ComparingUpdateTracker.cxx	(working copy)
@@ -18,20 +18,91 @@
 #include <stdio.h>
 #include <string.h>
 #include <vector>
+#include <list>
 #include <rdr/types.h>
 #include <rfb/Exception.h>
 #include <rfb/ComparingUpdateTracker.h>
 
 using namespace rfb;
 
+#if THREADED
+
+#include <sys/time.h>
+#include <time.h>
+
+#endif
+
 ComparingUpdateTracker::ComparingUpdateTracker(PixelBuffer* buffer)
   : fb(buffer), oldFb(fb->getPF(), 0, 0), firstCompare(true)
 {
     changed.assign_union(fb->getRect());
+
+#if	THREADED
+	// If threaded we need to set up the thread pool, 
+	// mutexes, and condition variables
+	int i = 0;
+
+	// Set the threads to be detached
+	pthread_attr_t threadattrs;
+	pthread_attr_init(&threadattrs);
+	pthread_attr_setdetachstate(&threadattrs, PTHREAD_CREATE_DETACHED);
+	
+	// Create pointers to the structure array and condition variable array
+	tas = (THREADARGS*) calloc(THREADPOOL_SIZE, sizeof(THREADARGS));
+	
+	// Allocate 2 condition variables one for starting and one for done
+	condVars = (pthread_cond_t*) calloc(2, sizeof(pthread_cond_t));
+	startMutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
+	doneMutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
+	
+	// Initialize the mutexes
+	pthread_mutex_init(startMutex, NULL);
+	pthread_mutex_init(doneMutex, NULL);
+
+	// Initialize the condition variables for start and done
+	pthread_cond_init(&condVars[START_COND_INDEX], NULL);
+	pthread_cond_init(&condVars[DONE_COND_INDEX], NULL);
+
+	// Create the thread pool and assign the thread ID to the structure member	
+	for(i = 0; i < THREADPOOL_SIZE; i++){
+		
+		// Fill in the thread argument structure
+		tas[i].changedBlocks = &m_changedBlocks;
+		tas[i].rect = &m_Rect;
+		tas[i].startDoneVars = condVars;
+		tas[i].sMutex = startMutex;
+		tas[i].dMutex = doneMutex;
+		tas[i].fb = fb;
+		tas[i].oldFb = &oldFb;
+		tas[i].argIndex = i;
+		tas[i].shouldExit = false;
+		tas[i].bShouldBeRunning = false;
+		tas[i].pnDoneCount = &m_nDoneCount;
+
+		// Spawn and dispatch the thread
+		pthread_create(&(tas[i].tid), &threadattrs, threadWaitForRects, (void*) &tas[i]);
+	}
+#endif
 }
 
 ComparingUpdateTracker::~ComparingUpdateTracker()
 {
+#if THREADED
+	// Free up all of the memory that was allocated in the constructor
+	int i = 0;
+	for(i = 0; i < THREADPOOL_SIZE; i++){
+		tas[i].shouldExit = true;
+	}
+
+	free(doneMutex);
+	doneMutex = NULL;
+	free(startMutex);
+	startMutex = NULL;
+	free(condVars);
+	condVars = NULL;
+	free(tas);
+	tas = NULL;
+#endif // THREADED
 }
 
 
@@ -70,6 +141,7 @@
   }
 }
 
+#if (!THREADED)
 void ComparingUpdateTracker::compareRect(const Rect& r, Region* newChanged)
 {
   if (!r.enclosed_by(fb->getRect())) {
@@ -140,3 +212,184 @@
     newChanged->assign_union(temp);
   }
 }
+
+#else
+// THREADED
+// The threaded version needs the following functions
+
+void ComparingUpdateTracker::compareRect(const Rect& r, Region* newChanged)
+{
+	if (!r.enclosed_by(fb->getRect())) {
+		Rect safe;
+    		// Crop the rect and try again
+    		safe = r.intersect(fb->getRect());
+		if (!safe.is_empty())
+			compareRect(safe, newChanged);
+		return;
+	}
+
+	pthread_mutex_lock(startMutex);
+	
+	m_nDoneCount = 0;
+	m_Rect = r;
+	m_changedBlocks.clear();
+	for(int i = 0; i < THREADPOOL_SIZE; i++){
+		tas[i].bShouldBeRunning = true;
+	}
+
+	// Broadcast to wake up all of the worker threads
+	int status = pthread_cond_broadcast(&condVars[START_COND_INDEX]);
+
+	pthread_mutex_unlock(startMutex);
+	
+	struct timespec timeout;
+	status = -1;
+	while(m_nDoneCount <  THREADPOOL_SIZE){
+      	// Wait for the worker threads to signal that they are all done
+		pthread_mutex_lock(doneMutex);
+	
+		clock_gettime(CLOCK_REALTIME, &timeout);
+
+		timeout.tv_nsec += 3000000;
+		status = pthread_cond_timedwait(&condVars[DONE_COND_INDEX], doneMutex, &timeout);
+
+		// Wait for the worker threads to signal that they are all done
+		pthread_mutex_unlock(doneMutex);
+	
+		if(status == 0)
+			break;
+
+	}
+
+	// Merge all of the changed blocks into a changed region
+	if (!m_changedBlocks.empty()) {
+		Region temp;
+		temp.setOrderedRects(m_changedBlocks);
+		newChanged->assign_union(temp);
+	}
+}
+
+void* threadWaitForRects(void* arg)
+{
+	THREADARGS* threadargs = (THREADARGS*) arg;
+	RECTINFO rectinfo;
+	pthread_cond_t* startCond = &threadargs->startDoneVars[START_COND_INDEX];
+	pthread_cond_t* doneCond = &threadargs->startDoneVars[DONE_COND_INDEX];
+
+	while(!threadargs->shouldExit){
+		pthread_mutex_lock(threadargs->sMutex);
+
+		if(!(threadargs->bShouldBeRunning)){
+			// Wait for the condition variable
+			pthread_cond_wait(startCond, threadargs->sMutex);
+		}
+		// Unlock the condition mutex
+		pthread_mutex_unlock(threadargs->sMutex);
+	
+		threadargs->bShouldBeRunning = false;
+
+		Rect r = *(threadargs->rect);
+		std::vector<Rect> changedRects;
+		int oldStride;
+		int bytesPerPixel = (threadargs->fb)->getPF().bpp/8;
+		int blockSize = ((r.br.y - r.tl.y) / THREADPOOL_SIZE);
+		int blockTop = 0;
+		if(((r.br.y - r.tl.y) % THREADPOOL_SIZE) != 0)
+			blockSize++;
+
+		if(blockSize < BLOCK_SIZE)
+			blockSize = BLOCK_SIZE;
+
+		blockTop = r.tl.y + (blockSize * threadargs->argIndex);
+
+		// If this thread is beyond the bottom of the rectangle just finish and go back to waiting
+		if(blockTop > r.br.y){
+			pthread_mutex_lock(threadargs->dMutex);
+
+			(*threadargs->pnDoneCount) = (*threadargs->pnDoneCount) + 1;
+
+			if((*threadargs->pnDoneCount) == THREADPOOL_SIZE){
+                        	pthread_cond_signal(doneCond);
+	                }
+			pthread_mutex_unlock(threadargs->dMutex);
+
+			continue;
+		}
+
+		Rect pos(r.tl.x, blockTop, r.br.x, __rfbmin(r.br.y, blockTop+blockSize));
+
+		// Fill the RECTINFO structure
+		rectinfo.rect = pos;
+		rectinfo.blockSize = blockSize;
+		rectinfo.oldData = (threadargs->oldFb)->getPixelsRW(pos, &oldStride);
+		rectinfo.oldStrideBytes = oldStride * bytesPerPixel; 
+
+		// Call threadedCompareRect on the rectangle information structure
+		threadedCompareRect(&rectinfo, threadargs->fb, &changedRects);
+		
+		// Add the changed blocks into the class's changed blocks vector
+		pthread_mutex_lock(threadargs->dMutex);
+		
+		if(!(changedRects.empty())){
+			std::vector<Rect>::iterator start, finish, glob;
+			glob = threadargs->changedBlocks->end();
+			start = changedRects.begin();
+			finish = changedRects.end();
+
+			threadargs->changedBlocks->insert(glob, start, finish);
+		}
+		
+		// If this is the last thread to complete then signal to the main thread that they are all done
+		(*threadargs->pnDoneCount) = (*threadargs->pnDoneCount) + 1;
+
+		if((*threadargs->pnDoneCount) >= THREADPOOL_SIZE){
+			pthread_cond_signal(doneCond);
+		}
+
+		pthread_mutex_unlock(threadargs->dMutex);
+	}
+	return NULL;
+}
+
+void threadedCompareRect(RECTINFO* ri, rfb::PixelBuffer* fb, std::vector<Rect>* changed)
+{	
+	Rect r = ri->rect;
+	int blockSize = ri->blockSize;
+	int blockTop = r.tl.y;
+	int fbStride;
+	int bytesPerPixel = fb->getPF().bpp/8;
+    	const rdr::U8* newBlockPtr = fb->getPixelsR(r, &fbStride);
+	int newStrideBytes = fbStride * bytesPerPixel;
+
+	rdr::U8* oldBlockPtr = ri->oldData;
+   	int blockBottom = __rfbmin(blockTop + blockSize, r.br.y);
+
+	for (int blockLeft = r.tl.x; blockLeft < r.br.x; blockLeft += blockSize){
+		const rdr::U8* newPtr = newBlockPtr;
+		rdr::U8* oldPtr = oldBlockPtr;
+
+		int blockRight = __rfbmin(blockLeft + blockSize, r.br.x);
+		int blockWidthInBytes = (blockRight-blockLeft) * bytesPerPixel;
+
+      		for (int y = blockTop; y < blockBottom; y++){
+			if (memcmp(oldPtr, newPtr, blockWidthInBytes) != 0){
+				// A block has changed - copy the remainder to the oldFb
+				changed->push_back(Rect(blockLeft, blockTop, blockRight, blockBottom));
+				for (int y2 = y; y2 < blockBottom; y2++){
+					memcpy(oldPtr, newPtr, blockWidthInBytes);
+					newPtr += newStrideBytes;
+					oldPtr += ri->oldStrideBytes;
+				}
+				break;
+			}
+
+			newPtr += newStrideBytes;
+			oldPtr += ri->oldStrideBytes;
+		}
+
+		oldBlockPtr += blockWidthInBytes;
+		newBlockPtr += blockWidthInBytes;
+	}
+}
+
+#endif // THREADED
Index: common/rfb/ComparingUpdateTracker.h
===================================================================
--- common/rfb/ComparingUpdateTracker.h	(revision 4663)
+++ common/rfb/ComparingUpdateTracker.h	(working copy)
@@ -21,6 +21,76 @@
 
 #include <rfb/UpdateTracker.h>
 
+// Define the THREADED flag
+#ifndef THREADED
+#define THREADED	1
+#endif
+
+#if	THREADED
+#include <stdlib.h>
+#include <pthread.h>
+#include <queue>
+
+#ifndef THREADPOOL_SIZE
+#define THREADPOOL_SIZE		10
+#endif
+
+#define START_COND_INDEX	0
+#define	DONE_COND_INDEX		1
+
+/* Struct used for passing thread arguments
+ * possibly for returning values too 
+ */
+typedef struct _tagTHREADARGS{
+	std::vector<rfb::Rect>* changedBlocks;
+	rfb::Rect* rect;
+	rfb::PixelBuffer* fb;
+	rfb::ManagedPixelBuffer* oldFb;
+
+	// Ready and Done queues containing indexes of done threads
+	std::queue<int>* rQ;
+	std::queue<int>* dQ;
+	
+	pthread_t tid;
+
+	// Mutexes used for the queues
+	pthread_mutex_t* sMutex;
+	pthread_mutex_t* dMutex;
+
+	// Condition variables used to signal start and done conditions
+	pthread_cond_t* startDoneVars;
+
+	// Flag telling the thread to terminate
+	bool shouldExit;
+
+	// Flag indicating the start signal was sent before the thread was ready
+	bool bShouldBeRunning;
+	
+	// Zero-based index for the thread
+	int argIndex;
+
+	// Number of threads that are done processing their Rects
+	int* pnDoneCount;
+} THREADARGS;
+
+/* This is the struct passed by each thread to the 
+ * threadCompareRect function to do the comparison
+ */
+typedef struct _tagRECTINFO{
+	rfb::Rect rect;
+	rdr::U8* oldData;
+	int oldStrideBytes;
+
+	/* This value is like BLOCK_SIZE but can be higher if more threads
+	 * should do more work
+	 */
+	int blockSize;	
+} RECTINFO;
+
+void* threadWaitForRects(void* arg);
+void threadedCompareRect(RECTINFO* ri, rfb::PixelBuffer* fb, std::vector<rfb::Rect>* changed);
+#endif
+
 namespace rfb {
 
   class ComparingUpdateTracker : public SimpleUpdateTracker {
@@ -34,6 +104,16 @@
     virtual void compare();
   private:
     void compareRect(const Rect& r, Region* newchanged);
+#if THREADED
+	THREADARGS* tas;
+	std::vector<Rect> m_changedBlocks;
+	Rect m_Rect;
+	pthread_mutex_t* startMutex;
+	pthread_mutex_t* doneMutex;
+	pthread_cond_t* condVars;
+	int m_nDoneCount;
+#endif
+
     PixelBuffer* fb;
     ManagedPixelBuffer oldFb;
     bool firstCompare;
------------------------------------------------------------------------------
Special Offer -- Download ArcSight Logger for FREE!
Finally, a world-class log management solution at an even better 
price-free! And you'll get a free "Love Thy Logs" t-shirt when you
download Logger. Secure your free ArcSight Logger TODAY!
http://p.sf.net/sfu/arcsisghtdev2dev
_______________________________________________
Tigervnc-devel mailing list
Tigervnc-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tigervnc-devel

Reply via email to