I was working on a project for school that included implementing a
multithreaded version of the ComparingUpdateTracker class to try and improve
overall performance. There are a couple things I was unsure about. I'm not
sure how the THREADED flag should be set as well as the THREADPOOL_SIZE
constant. If you would like to provide recommendations I would be happy to
change it. Also, if there are any other changes that should be made I would
appreciate the guidance. If you would rather have the patch embedded instead
of attached just let me know. Thank you.
Rob King
Index: common/rfb/ComparingUpdateTracker.cxx
===================================================================
--- common/rfb/ComparingUpdateTracker.cxx (revision 4663)
+++ common/rfb/ComparingUpdateTracker.cxx (working copy)
@@ -18,20 +18,91 @@
#include <stdio.h>
#include <string.h>
#include <vector>
+#include <list>
#include <rdr/types.h>
#include <rfb/Exception.h>
#include <rfb/ComparingUpdateTracker.h>
using namespace rfb;
+#if THREADED
+
+#include <sys/time.h>
+#include <time.h>
+
+#endif
+
ComparingUpdateTracker::ComparingUpdateTracker(PixelBuffer* buffer)
: fb(buffer), oldFb(fb->getPF(), 0, 0), firstCompare(true)
{
changed.assign_union(fb->getRect());
+
+#if THREADED
+ // If threaded we need to set up the thread pool,
+ // mutexes, and condition variables
+ int i = 0;
+
+ // Set the threads to be detached
+ pthread_attr_t threadattrs;
+ pthread_attr_init(&threadattrs);
+ pthread_attr_setdetachstate(&threadattrs, PTHREAD_CREATE_DETACHED);
+
+ // Create pointers to the structure array and condition variable array
+ tas = (THREADARGS*) calloc(THREADPOOL_SIZE, sizeof(THREADARGS));
+
+ // Allocate 2 condition variables one for starting and one for done
+ condVars = (pthread_cond_t*) calloc(2, sizeof(pthread_cond_t));
+ startMutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
+ doneMutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
+
+ // Initialize the mutexes
+ pthread_mutex_init(startMutex, NULL);
+ pthread_mutex_init(doneMutex, NULL);
+
+ // Initialize the condition variables for start and done
+ pthread_cond_init(&condVars[START_COND_INDEX], NULL);
+ pthread_cond_init(&condVars[DONE_COND_INDEX], NULL);
+
+ // Create the thread pool and assign the thread ID to the structure member
+ for(i = 0; i < THREADPOOL_SIZE; i++){
+
+ // Fill in the thread argument structure
+ tas[i].changedBlocks = &m_changedBlocks;
+ tas[i].rect = &m_Rect;
+ tas[i].startDoneVars = condVars;
+ tas[i].sMutex = startMutex;
+ tas[i].dMutex = doneMutex;
+ tas[i].fb = fb;
+ tas[i].oldFb = &oldFb;
+ tas[i].argIndex = i;
+ tas[i].shouldExit = false;
+ tas[i].bShouldBeRunning = false;
+ tas[i].pnDoneCount = &m_nDoneCount;
+
+ // Spawn and dispatch the thread
+ pthread_create(&(tas[i].tid), &threadattrs, threadWaitForRects, (void*) &tas[i]);
+ }
+#endif
}
ComparingUpdateTracker::~ComparingUpdateTracker()
{
+#if THREADED
+ // Free up all of the memory that was allocated in the constructor
+ int i = 0;
+ for(i = 0; i < THREADPOOL_SIZE; i++){
+ tas[i].shouldExit = true;
+ }
+
+ free(doneMutex);
+ doneMutex = NULL;
+ free(startMutex);
+ startMutex = NULL;
+ free(condVars);
+ condVars = NULL;
+ free(tas);
+ tas = NULL;
+#endif // THREADED
}
@@ -70,6 +141,7 @@
}
}
+#if (!THREADED)
void ComparingUpdateTracker::compareRect(const Rect& r, Region* newChanged)
{
if (!r.enclosed_by(fb->getRect())) {
@@ -140,3 +212,184 @@
newChanged->assign_union(temp);
}
}
+
+#else
+// THREADED
+// The threaded version needs the following functions
+
+void ComparingUpdateTracker::compareRect(const Rect& r, Region* newChanged)
+{
+ if (!r.enclosed_by(fb->getRect())) {
+ Rect safe;
+ // Crop the rect and try again
+ safe = r.intersect(fb->getRect());
+ if (!safe.is_empty())
+ compareRect(safe, newChanged);
+ return;
+ }
+
+ pthread_mutex_lock(startMutex);
+
+ m_nDoneCount = 0;
+ m_Rect = r;
+ m_changedBlocks.clear();
+ for(int i = 0; i < THREADPOOL_SIZE; i++){
+ tas[i].bShouldBeRunning = true;
+ }
+
+ // Broadcast to wake up all of the worker threads
+ int status = pthread_cond_broadcast(&condVars[START_COND_INDEX]);
+
+ pthread_mutex_unlock(startMutex);
+
+ struct timespec timeout;
+ status = -1;
+ while(m_nDoneCount < THREADPOOL_SIZE){
+ // Wait for the worker threads to signal that they are all done
+ pthread_mutex_lock(doneMutex);
+
+ clock_gettime(CLOCK_REALTIME, &timeout);
+
+ timeout.tv_nsec += 3000000;
+ status = pthread_cond_timedwait(&condVars[DONE_COND_INDEX], doneMutex, &timeout);
+
+ // Wait for the worker threads to signal that they are all done
+ pthread_mutex_unlock(doneMutex);
+
+ if(status == 0)
+ break;
+
+ }
+
+ // Merge all of the changed blocks into a changed region
+ if (!m_changedBlocks.empty()) {
+ Region temp;
+ temp.setOrderedRects(m_changedBlocks);
+ newChanged->assign_union(temp);
+ }
+}
+
+void* threadWaitForRects(void* arg)
+{
+ THREADARGS* threadargs = (THREADARGS*) arg;
+ RECTINFO rectinfo;
+ pthread_cond_t* startCond = &threadargs->startDoneVars[START_COND_INDEX];
+ pthread_cond_t* doneCond = &threadargs->startDoneVars[DONE_COND_INDEX];
+
+ while(!threadargs->shouldExit){
+ pthread_mutex_lock(threadargs->sMutex);
+
+ if(!(threadargs->bShouldBeRunning)){
+ // Wait for the condition variable
+ pthread_cond_wait(startCond, threadargs->sMutex);
+ }
+ // Unlock the condition mutex
+ pthread_mutex_unlock(threadargs->sMutex);
+
+ threadargs->bShouldBeRunning = false;
+
+ Rect r = *(threadargs->rect);
+ std::vector<Rect> changedRects;
+ int oldStride;
+ int bytesPerPixel = (threadargs->fb)->getPF().bpp/8;
+ int blockSize = ((r.br.y - r.tl.y) / THREADPOOL_SIZE);
+ int blockTop = 0;
+ if(((r.br.y - r.tl.y) % THREADPOOL_SIZE) != 0)
+ blockSize++;
+
+ if(blockSize < BLOCK_SIZE)
+ blockSize = BLOCK_SIZE;
+
+ blockTop = r.tl.y + (blockSize * threadargs->argIndex);
+
+ // If this thread is beyond the bottom of the rectangle just finish and go back to waiting
+ if(blockTop > r.br.y){
+ pthread_mutex_lock(threadargs->dMutex);
+
+ (*threadargs->pnDoneCount) = (*threadargs->pnDoneCount) + 1;
+
+ if((*threadargs->pnDoneCount) == THREADPOOL_SIZE){
+ pthread_cond_signal(doneCond);
+ }
+ pthread_mutex_unlock(threadargs->dMutex);
+
+ continue;
+ }
+
+ Rect pos(r.tl.x, blockTop, r.br.x, __rfbmin(r.br.y, blockTop+blockSize));
+
+ // Fill the RECTINFO structure
+ rectinfo.rect = pos;
+ rectinfo.blockSize = blockSize;
+ rectinfo.oldData = (threadargs->oldFb)->getPixelsRW(pos, &oldStride);
+ rectinfo.oldStrideBytes = oldStride * bytesPerPixel;
+
+ // Call threadedCompareRect on the rectangle information structure
+ threadedCompareRect(&rectinfo, threadargs->fb, &changedRects);
+
+ // Add the changed blocks into the class's changed blocks vector
+ pthread_mutex_lock(threadargs->dMutex);
+
+ if(!(changedRects.empty())){
+ std::vector<Rect>::iterator start, finish, glob;
+ glob = threadargs->changedBlocks->end();
+ start = changedRects.begin();
+ finish = changedRects.end();
+
+ threadargs->changedBlocks->insert(glob, start, finish);
+ }
+
+ // If this is the last thread to complete then signal to the main thread that they are all done
+ (*threadargs->pnDoneCount) = (*threadargs->pnDoneCount) + 1;
+
+ if((*threadargs->pnDoneCount) >= THREADPOOL_SIZE){
+ pthread_cond_signal(doneCond);
+ }
+
+ pthread_mutex_unlock(threadargs->dMutex);
+ }
+ return NULL;
+}
+
+void threadedCompareRect(RECTINFO* ri, rfb::PixelBuffer* fb, std::vector<Rect>* changed)
+{
+ Rect r = ri->rect;
+ int blockSize = ri->blockSize;
+ int blockTop = r.tl.y;
+ int fbStride;
+ int bytesPerPixel = fb->getPF().bpp/8;
+ const rdr::U8* newBlockPtr = fb->getPixelsR(r, &fbStride);
+ int newStrideBytes = fbStride * bytesPerPixel;
+
+ rdr::U8* oldBlockPtr = ri->oldData;
+ int blockBottom = __rfbmin(blockTop + blockSize, r.br.y);
+
+ for (int blockLeft = r.tl.x; blockLeft < r.br.x; blockLeft += blockSize){
+ const rdr::U8* newPtr = newBlockPtr;
+ rdr::U8* oldPtr = oldBlockPtr;
+
+ int blockRight = __rfbmin(blockLeft + blockSize, r.br.x);
+ int blockWidthInBytes = (blockRight-blockLeft) * bytesPerPixel;
+
+ for (int y = blockTop; y < blockBottom; y++){
+ if (memcmp(oldPtr, newPtr, blockWidthInBytes) != 0){
+ // A block has changed - copy the remainder to the oldFb
+ changed->push_back(Rect(blockLeft, blockTop, blockRight, blockBottom));
+ for (int y2 = y; y2 < blockBottom; y2++){
+ memcpy(oldPtr, newPtr, blockWidthInBytes);
+ newPtr += newStrideBytes;
+ oldPtr += ri->oldStrideBytes;
+ }
+ break;
+ }
+
+ newPtr += newStrideBytes;
+ oldPtr += ri->oldStrideBytes;
+ }
+
+ oldBlockPtr += blockWidthInBytes;
+ newBlockPtr += blockWidthInBytes;
+ }
+}
+
+#endif // THREADED
Index: common/rfb/ComparingUpdateTracker.h
===================================================================
--- common/rfb/ComparingUpdateTracker.h (revision 4663)
+++ common/rfb/ComparingUpdateTracker.h (working copy)
@@ -21,6 +21,76 @@
#include <rfb/UpdateTracker.h>
+// Define the THREADED flag
+#ifndef THREADED
+#define THREADED 1
+#endif
+
+#if THREADED
+#include <stdlib.h>
+#include <pthread.h>
+#include <queue>
+
+#ifndef THREADPOOL_SIZE
+#define THREADPOOL_SIZE 10
+#endif
+
+#define START_COND_INDEX 0
+#define DONE_COND_INDEX 1
+
+/* Struct used for passing thread arguments
+ * possibly for returning values too
+ */
+typedef struct _tagTHREADARGS{
+ std::vector<rfb::Rect>* changedBlocks;
+ rfb::Rect* rect;
+ rfb::PixelBuffer* fb;
+ rfb::ManagedPixelBuffer* oldFb;
+
+ // Ready and Done queues containing indexes of done threads
+ std::queue<int>* rQ;
+ std::queue<int>* dQ;
+
+ pthread_t tid;
+
+ // Mutexes used for the queues
+ pthread_mutex_t* sMutex;
+ pthread_mutex_t* dMutex;
+
+ // Condition variables used to signal start and done conditions
+ pthread_cond_t* startDoneVars;
+
+ // Flag telling the thread to terminate
+ bool shouldExit;
+
+ // Flag indicating the start signal was sent before the thread was ready
+ bool bShouldBeRunning;
+
+ // Zero-based index for the thread
+ int argIndex;
+
+ // Number of threads that are done processing their Rects
+ int* pnDoneCount;
+} THREADARGS;
+
+/* This is the struct passed by each thread to the
+ * threadCompareRect function to do the comparison
+ */
+typedef struct _tagRECTINFO{
+ rfb::Rect rect;
+ rdr::U8* oldData;
+ int oldStrideBytes;
+
+ /* This value is like BLOCK_SIZE but can be higher if more threads
+ * should do more work
+ */
+ int blockSize;
+} RECTINFO;
+
+void* threadWaitForRects(void* arg);
+void threadedCompareRect(RECTINFO* ri, rfb::PixelBuffer* fb, std::vector<rfb::Rect>* changed);
+#endif
+
namespace rfb {
class ComparingUpdateTracker : public SimpleUpdateTracker {
@@ -34,6 +104,16 @@
virtual void compare();
private:
void compareRect(const Rect& r, Region* newchanged);
+#if THREADED
+ THREADARGS* tas;
+ std::vector<Rect> m_changedBlocks;
+ Rect m_Rect;
+ pthread_mutex_t* startMutex;
+ pthread_mutex_t* doneMutex;
+ pthread_cond_t* condVars;
+ int m_nDoneCount;
+#endif
+
PixelBuffer* fb;
ManagedPixelBuffer oldFb;
bool firstCompare;
------------------------------------------------------------------------------
Special Offer -- Download ArcSight Logger for FREE!
Finally, a world-class log management solution at an even better
price-free! And you'll get a free "Love Thy Logs" t-shirt when you
download Logger. Secure your free ArcSight Logger TODAY!
http://p.sf.net/sfu/arcsisghtdev2dev
_______________________________________________
Tigervnc-devel mailing list
Tigervnc-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tigervnc-devel