Jason,

Yes, the code bellow replicates the problem with some number crunching
instead of equation solving but the symptom is the same. On my quadcore
laptop for NEQUATION = 500 the speedup is perfect.

For the very important NEQUATION = 8 case the serial version is faster
twice.

We need to solve millions of small equations that are changing in time so I
cannot solve more of them in a thread just one. Obviously there is a fixed
cost somewhere but I am not able to figure out what is that, still 0mq is
new to me. Looks like the latency is high if I send just few messages that
I do not understand.

I use 64 bit Intel compiler.

I will appreciate any suggestion.



// pairTest.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <zmq.h>
#include <zmq_utils.h>
#include <stdio.h>
#include <stdlib.h>
#include <vector>
#include <windows.h>
#include <process.h>
#include <assert.h>

static int roundtrip_count = 0 ;
#define NEQUATION 8
#define NCYCLE 1

class ComputingThread
{

public:

ComputingThread(void * ctx, const int& id) : m_ctx(ctx), m_id(id)
{
char connText[128];


this->m_socketSend = zmq_socket(this->m_ctx, ZMQ_PAIR);
sprintf(connText,"inproc://terminate_%d", this->m_id);

int rc = zmq_bind(this->m_socketSend,connText);

}
static unsigned __stdcall ThreadStaticEntryPoint(void * pThis)
{
ComputingThread * pthX = (ComputingThread*)pThis;
int ret = pthX->worker();
return ret;
}

int worker()
{

int rc;
int j;
char connText[128];


this->m_socketReceive = zmq_socket(this->m_ctx, ZMQ_PAIR);

sprintf(connText,"inproc://connect_%d", this->m_id);

rc = zmq_connect(this->m_socketReceive,connText);

zmq_msg_t msg;

rc = zmq_msg_init(&msg);


while(true)
{
rc = zmq_recvmsg(this->m_socketReceive, &msg, 0);
int messageSize = zmq_msg_size(&msg) ;
if(messageSize == 0)
break ;
int ic1 = 0 ;
int ic2 = 0 ;
double sum = 0.0 ;
for(ic1 = 0 ; ic1 < NCYCLE ; ic1++)
{
for(ic2 = 0 ; ic2 < 100000 ; ic2++)
{
int v = rand() % 100;
sum = sum + sin(float(v)) + sqrt(float(v) + cos(sqrt(float(v)))) ;
}

}


}

zmq_msg_t msgTerm;
rc = zmq_msg_init (&msgTerm);
rc = zmq_sendmsg (this->m_socketSend, &msgTerm, 0);
zmq_msg_close(&msgTerm);


rc = zmq_msg_close (&msg);

rc = zmq_close (this->m_socketReceive);
rc = zmq_close (this->m_socketSend);



return 0;

}

private:
void * m_ctx ;
int m_id ;

void *m_socketReceive;
void *m_socketSend;


};




int main(int argc, const char* const argv[])
{
std::vector<ComputingThread * > computingThreads ;
std::vector< HANDLE> handles ;
std::vector<void *> socketsSend ;
std::vector<void *> socketsReceive ;
std::vector<zmq_msg_t> messages ;

void *watch = NULL;

int nequation = NEQUATION ;

int nthread = 8 ;
unsigned long elapsed = 0;

printf("\nSerial Started...\n") ;

watch = zmq_stopwatch_start();

int iequation = 0;
for(iequation = 0 ; iequation < nequation ; iequation++)
{

int ic1 = 0 ;
int ic2 = 0 ;
double sum = 0.0 ;
for(ic1 = 0 ; ic1 < NCYCLE ; ic1++)
{
for(ic2 = 0 ; ic2 < 100000 ; ic2++)
{
int v = rand() % 100;
sum = sum + sin(float(v)) + sqrt(float(v) + cos(sqrt(float(v)))) ;
}

}

}

unsigned long serialElapsed = zmq_stopwatch_stop(watch) / 2 ;

printf("\nSerial Done. Took %ld microsec.\n", serialElapsed) ;


void *ctx = NULL;
void * socket = NULL ;

int rc = 0 ;




ctx = zmq_init(1);

// create sockets
int ithread = 0 ;
for(ithread = 0 ; ithread < nthread ; ithread++)
{
void * socket = zmq_socket(ctx, ZMQ_PAIR);
socketsSend.push_back(socket);

socket = zmq_socket(ctx, ZMQ_PAIR);
socketsReceive.push_back(socket);


}


// connections
for(ithread = 0 ; ithread < nthread ; ithread++)
{
void * socket = socketsSend[ithread];
char connectionString[128];
sprintf(connectionString,"inproc://connect_%d",ithread);
rc = zmq_bind(socket, connectionString);

}


for(ithread = 0 ; ithread < nthread; ithread++)
{
ComputingThread * cthr = new ComputingThread(ctx, ithread);
computingThreads.push_back(cthr);
HANDLE localHandle = (HANDLE) _beginthreadex(NULL, 0,
ComputingThread::ThreadStaticEntryPoint,
cthr, 0, NULL);
if (localHandle == 0)
{
printf ("error in _beginthreadex %d\n",ithread);
return -1;
}
handles.push_back(localHandle);
}


for(ithread = 0 ; ithread < nthread ; ithread++)
{

char connectionString[128];
sprintf(connectionString,"inproc://terminate_%d",ithread);
void * socket = socketsReceive[ithread];
rc = zmq_connect (socket,connectionString);


}



int i = 0 ;
printf("\nParallel Started...\n") ;

int nfinish = nthread ;
zmq_msg_t msgTerm;
rc = zmq_msg_init (&msgTerm);


watch = zmq_stopwatch_start();

// send messages
int messageCounter = 0 ;

for(iequation = 0 ; iequation < nequation ; iequation++)
{
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
rc = zmq_msg_init_size(&msg, 8);
memset (zmq_msg_data (&msg), 'A', 8);
ithread = messageCounter % nthread ;
messageCounter++ ;
void * socket = socketsSend[ithread];
rc = zmq_sendmsg (socket, &msg, 0);
zmq_msg_close(&msg);
}


// terminate threads
for(ithread = 0; ithread < nthread ; ithread++)
{
zmq_msg_t msgTerm;
rc = zmq_msg_init (&msgTerm);
void * socket = socketsSend[ithread];
rc = zmq_sendmsg (socket, &msgTerm, 0);
zmq_msg_close(&msgTerm);

}


int flags[16] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};

while(nfinish > 0)
{
for(ithread = 0; ithread < nthread ; ithread++)
{
if(flags[ithread])
{
void * recv = socketsReceive[ithread];
rc = zmq_recvmsg (recv, &msgTerm, 0);
int messageSize = zmq_msg_size (&msgTerm) ;
if(messageSize == 0)
{
flags[ithread] = 0;
nfinish-- ;
break ;
}

}
}

}


unsigned long parallelElapsed = zmq_stopwatch_stop(watch) / 2 ;

printf("\nParallel Done. Took %ld microsec.\n", parallelElapsed) ;

double speedup = float(serialElapsed) / parallelElapsed ;

printf("\nspeedup = %g\n", speedup) ;


DWORD rc2 = WaitForMultipleObjects( nthread, &handles[0], true, INFINITE);
if (rc2 == WAIT_FAILED)
{
printf ("error in WaitForMultipleObject\n");
return -1;
}
assert( ( rc2 >= WAIT_OBJECT_0 ) && ( rc2 <= ( WAIT_OBJECT_0 + nthread - 1
) ) );





//close handles
for(ithread = 0; ithread < nthread ; ithread++)
{
HANDLE localHandle = handles[ithread];
BOOL rcc = CloseHandle (localHandle);
if (rcc == 0)
{
printf ("error in CloseHandle\n");
return -1;
}


}


for(ithread = 0 ; ithread < nthread ; ithread++)
{
void * socket = socketsSend[ithread];
rc = zmq_close (socket);
socket = socketsReceive[ithread];
rc = zmq_close (socket);


}

zmq_term(ctx);

return 0;
}



On Tue, Jan 22, 2013 at 3:39 PM, Jason Smith <[email protected]>wrote:

> Do you have some code to share? Particularly the ZMQ socket connection and
> creation.
>
> On another thought, how is the "Finish" determined? Do the threads end, or
> do they continue to wait for another message? Is a "finished" message sent
> to the main thread?
>
>
> On 22 January 2013 00:06, dan smith <[email protected]> wrote:
>
>> more precisely: 'I do not know how to debug that further'
>>
>>
>>
>> On Mon, Jan 21, 2013 at 2:49 AM, dan smith <[email protected]> wrote:
>>
>>>
>>> Claudio,
>>>
>>> Thanks for your answer.
>>>
>>> When it comes to 8, the timing changes randomly, sometime times is less
>>> than the time needed to solve 80 equations. I did 4 runs again, the times
>>> were between 115000 and 125000. This program is very simple and each and
>>> every thread does the very same thing using the very same code, no
>>> difference between the threads. I do know how to debug that further. The
>>>  main thread just sends the pointer and the worker thread solves it after
>>> receiving it, 3 lines are relevant.The thread is a basic windows thread
>>> like in inproc_lat.
>>>
>>>
>>> On Mon, Jan 21, 2013 at 2:13 AM, Claudio Carbone <[email protected]>wrote:
>>>
>>>> dan smith <[email protected]> wrote:
>>>> >80 times: 64614micros and 134429micros, the serial is already faster.
>>>> >
>>>> >Going down to 8: 6345 and 328286...
>>>> >
>>>>
>>>> There must be something wrong if it takes less to solve 80eqs than 8,
>>>> no matter what.
>>>> Why don't you save/print split times for each eq and each phase of the
>>>> program?
>>>> With 8 it isn't crazy to analyze the numbers.
>>>>
>>>>
>>>> Claudio
>>>> -- Sent from my ParanoidAndroid Galaxy Nexus with K-9 Mail.
>>>>
>>>> _______________________________________________
>>>> zeromq-dev mailing list
>>>> [email protected]
>>>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>>>
>>>>
>>>
>>
>> _______________________________________________
>> zeromq-dev mailing list
>> [email protected]
>> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>>
>>
>
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
// pairTest.cpp : Defines the entry point for the console application.
//

#include "stdafx.h"
#include <zmq.h>
#include <zmq_utils.h>
#include <stdio.h>				
#include <stdlib.h>				
#include <vector>		
#include <windows.h>
#include <process.h>
#include <assert.h>

static int roundtrip_count = 0 ;
#define NEQUATION 8
#define NCYCLE 1 

class ComputingThread
{

public:

	ComputingThread(void * ctx, const int& id) : m_ctx(ctx), m_id(id) 
	{
		char connText[128];


		this->m_socketSend = zmq_socket(this->m_ctx, ZMQ_PAIR);
		sprintf(connText,"inproc://terminate_%d", this->m_id);

		int rc = zmq_bind(this->m_socketSend,connText);

	}
	static unsigned __stdcall ThreadStaticEntryPoint(void * pThis)  
	{
		ComputingThread * pthX = (ComputingThread*)pThis;   
		int ret = pthX->worker();           
		return ret;      
	}

	int worker()
	{

		int rc;
		int j;
		char connText[128];


		this->m_socketReceive = zmq_socket(this->m_ctx, ZMQ_PAIR);

		sprintf(connText,"inproc://connect_%d", this->m_id);
		
		rc = zmq_connect(this->m_socketReceive,connText);

		zmq_msg_t msg;

		rc = zmq_msg_init(&msg);


		while(true)
		{
			rc = zmq_recvmsg(this->m_socketReceive, &msg, 0);
			int messageSize = zmq_msg_size(&msg) ;
			if(messageSize == 0)
				break ;
			int ic1 = 0 ;
			int ic2 = 0 ;
			double sum = 0.0 ;
			for(ic1 = 0 ; ic1 < NCYCLE ; ic1++)
			{
				for(ic2 = 0 ; ic2 < 100000 ; ic2++)
				{
					int v = rand() % 100;
					sum = sum + sin(float(v)) + sqrt(float(v) + cos(sqrt(float(v)))) ;
				}

			}


		}

		zmq_msg_t msgTerm;
		rc = zmq_msg_init (&msgTerm);
		rc = zmq_sendmsg (this->m_socketSend, &msgTerm, 0);
		zmq_msg_close(&msgTerm);


		rc = zmq_msg_close (&msg);

		rc = zmq_close (this->m_socketReceive);
		rc = zmq_close (this->m_socketSend);



		return 0;

	} 

private:
	void * m_ctx ;
	int m_id ;

	void *m_socketReceive;
	void *m_socketSend;


};




int main(int argc, const char* const argv[])
{
	std::vector<ComputingThread * > computingThreads ;
	std::vector< HANDLE> handles ;
	std::vector<void *> socketsSend ;
	std::vector<void *> socketsReceive ;
	std::vector<zmq_msg_t> messages ;

	void *watch = NULL;

	int nequation = NEQUATION ;

	int nthread = 8 ;
	unsigned long elapsed = 0;

	printf("\nSerial Started...\n") ;

	watch = zmq_stopwatch_start();

	int iequation = 0;
	for(iequation = 0 ; iequation  < nequation ; iequation++)		
	{

		int ic1 = 0 ;
		int ic2 = 0 ;
		double sum = 0.0 ;
		for(ic1 = 0 ; ic1 < NCYCLE ; ic1++)
		{
			for(ic2 = 0 ; ic2 < 100000 ; ic2++)
			{
				int v = rand() % 100;
				sum = sum + sin(float(v)) + sqrt(float(v) + cos(sqrt(float(v)))) ;
			}

		}

	}

	unsigned long serialElapsed = zmq_stopwatch_stop(watch) / 2 ;

	printf("\nSerial Done.   Took %ld microsec.\n", serialElapsed) ;


	void *ctx = NULL;
	void * socket = NULL ;

	int rc = 0 ;




	ctx = zmq_init(1);

	// create sockets
	int ithread = 0 ;
	for(ithread = 0 ; ithread < nthread ; ithread++)
	{
		void * socket = zmq_socket(ctx, ZMQ_PAIR);
		socketsSend.push_back(socket);

		socket = zmq_socket(ctx, ZMQ_PAIR);
		socketsReceive.push_back(socket);


	}


	// connections
	for(ithread = 0 ; ithread < nthread ; ithread++)
	{
		void * socket = socketsSend[ithread];
		char connectionString[128];
		sprintf(connectionString,"inproc://connect_%d",ithread);
		rc = zmq_bind(socket, connectionString);

	}


	for(ithread = 0 ; ithread < nthread; ithread++)
	{
		ComputingThread * cthr = new ComputingThread(ctx, ithread);
		computingThreads.push_back(cthr);
		HANDLE localHandle = (HANDLE) _beginthreadex(NULL, 0, ComputingThread::ThreadStaticEntryPoint, 
			cthr, 0, NULL);
		if (localHandle == 0) 
		{
			printf ("error in _beginthreadex %d\n",ithread);
			return -1;
		}
		handles.push_back(localHandle);
	}


	for(ithread = 0 ; ithread < nthread ; ithread++)
	{

		char connectionString[128];
		sprintf(connectionString,"inproc://terminate_%d",ithread);
		void * socket = socketsReceive[ithread];
		rc = zmq_connect (socket,connectionString);


	}



	int i = 0 ;
	printf("\nParallel Started...\n") ;

	int nfinish = nthread ;
	zmq_msg_t msgTerm;
	rc = zmq_msg_init (&msgTerm);


	watch = zmq_stopwatch_start();

	// send messages
	int messageCounter = 0 ;

	for(iequation = 0 ; iequation  < nequation ; iequation++)		
	{
		zmq_msg_t msg;
		rc = zmq_msg_init (&msg);
		rc = zmq_msg_init_size(&msg, 8);
		memset (zmq_msg_data (&msg), 'A', 8);
		ithread = messageCounter % nthread ;
		messageCounter++ ;
		void * socket = socketsSend[ithread];
		rc = zmq_sendmsg (socket, &msg, 0);
		zmq_msg_close(&msg);
	}


	// terminate threads
	for(ithread = 0; ithread < nthread ; ithread++)
	{
		zmq_msg_t msgTerm;
		rc = zmq_msg_init (&msgTerm);
		void * socket = socketsSend[ithread];
		rc = zmq_sendmsg (socket, &msgTerm, 0);
		zmq_msg_close(&msgTerm);

	}


	int flags[16] = {1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1};

	while(nfinish > 0)
	{
		for(ithread = 0; ithread < nthread ; ithread++)
		{
			if(flags[ithread])
			{
				void * recv = socketsReceive[ithread];
				rc = zmq_recvmsg (recv, &msgTerm, 0);
				int messageSize = zmq_msg_size (&msgTerm) ;
				if(messageSize == 0)
				{
					flags[ithread]  = 0;
					nfinish-- ;
					break ;
				}

			}
		}
		
	}


	unsigned long parallelElapsed = zmq_stopwatch_stop(watch) / 2 ;

	printf("\nParallel Done. Took %ld microsec.\n", parallelElapsed) ;

	double speedup = float(serialElapsed) / parallelElapsed ;

	printf("\nspeedup = %g\n", speedup) ;


	DWORD rc2 = WaitForMultipleObjects( nthread, &handles[0], true, INFINITE);   
	if (rc2 == WAIT_FAILED) 
	{
		printf ("error in WaitForMultipleObject\n");
		return -1;
	}
	assert( ( rc2 >= WAIT_OBJECT_0 ) && ( rc2 <= ( WAIT_OBJECT_0 + nthread - 1 ) ) );





	//close handles
	for(ithread = 0; ithread < nthread ; ithread++)
	{
		HANDLE localHandle = handles[ithread];
		BOOL rcc = CloseHandle (localHandle);
		if (rcc == 0) 
		{
			printf ("error in CloseHandle\n");
			return -1;
		}


	}


	for(ithread = 0 ; ithread < nthread ; ithread++)
	{
		void * socket = socketsSend[ithread];
		rc = zmq_close (socket);
		socket = socketsReceive[ithread];
		rc = zmq_close (socket);


	}

	zmq_term(ctx);

	return 0;
}

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to