Hi,

In my output plugin (see attached code), because I need to process and send
the messages to some other daemon, I created a new pthread to listen/read
response from that daemon.  All the code works well when I ran by starting
rsyslog in manual  mode.

/usr/sbin/rsyslogd -f/etc/rsyslog.conf -u2 -n -irsyslog.pid
-M/usr/lib/rsyslog

However, when I ran my plugin with rsyslogd as service, it seems that my
new thread is started but it is not running properly. I saw the thread
print out a startup msg but it didn't continue printing new msg as expected.

What's wrong with my code? is there limitation on using new pthread?

Liwei
/** omazuremds.c
    
    This is the output module feeding directly to mdsd. Its inputs are rsyslog messages.
    It will send outputs, which are an array of json object strings to the mdsd agent.
    If any network failure occurs, including mdsd not working, data will be resent to mdsd.
    
    It uses batching for the input message processing. Each batch will be sent to mdsd agent in
    one send() call. After send(), it will read response from mdsd to validate whether proper data
    are received. If unexpected response is received, it will report error and resend the batch.

    Required configuration parameters:
    - queue.dequeuebatchsize: this defines the batch size. It is defined in rsyslog conf file.
    - mdsdport: this defines the mdsd agent port. It is defined in rsyslog conf file.
    - others: see the related rsyslog conf file for other parameters related to performance and reliability.

*/

#include "config.h"
#include "rsyslog.h"
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <signal.h>
#include <errno.h>
#include <unistd.h>
#include "conf.h"
#include "syslogd-types.h"
#include "srUtils.h"
#include "template.h"
#include "module-template.h"
#include "errmsg.h"
#include "cfsysline.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <time.h>
#include <poll.h>
#include "hashtable.h"
#include "hashtable_private.h"
#include <ctype.h>

MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
MODULE_CNFNAME("omazuremds") /* define the module name */

static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal);


/* internal structures
 */
DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)

const int INVALID_SOCKET = -1;  /* invalid socket value */


/* structure defining a batch of items to be
   sent to the mdsd in one send(). */
struct mdsd_batch
{
  /* number of items in dataList.*/
  int n;  

  /* An array of JSON object strings */
  char** dataList;
};

/* Each rsyslog process will have one instanceData instance.
   Use a hashtable cache to store the tag/json objects:
   Use a reader thread to read data from mdsd; if receiving any tag from mdsd, remove it from the hash.
   If reader thread read fails, mark the read failure.
*/
typedef struct _instanceData {
    int batchSize;    /* queue.dequeuebatchsize; this is the batch size.*/
    
    int mdsdPort;     /* mdsd port number */
    pthread_mutex_t mutBatch; /* lock for batch object changes. */

    int mdsd_sock;  /*socket to communicate with MDSD */

    /* hash cache to store all json string.  key=tag; value: json object string.*/
	struct hashtable *cacheTable;

	pthread_t readerThread; /* mdsd reader thread to read data from mdsd.*/
	int readOK;  /* 0: not OK for reader thread to read; 1: OK for reader thread to read. */
} instanceData;

/* Each rsyslog process may have N workerInstances, depending number of worker queues.*/
typedef struct wrkrInstanceData {
	instanceData *pData;
	struct mdsd_batch batch;
	char tagBase[16]; /* to make tag to be unique, create a unique base string first.*/
} wrkrInstanceData_t;

/* parameters defined in rsyslog conf file.*/
static struct cnfparamdescr actpdescr[] = {	
	{ "template", eCmdHdlrGetWord, 1 },
	{ "queue.dequeuebatchsize", eCmdHdlrGetWord, 1 },
	{ "mdsdport", eCmdHdlrGetWord, 1 }
};

static struct cnfparamblk actpblk = {
	CNFPARAMBLK_VERSION,
	sizeof(actpdescr)/sizeof(struct cnfparamdescr),
	actpdescr
};

/* (re)set config variables to default values */
BEGINinitConfVars
CODESTARTinitConfVars 
	resetConfigVariables(NULL, NULL);
ENDinitConfVars

int CreateReaderThread(instanceData* pData);

/* Resource allocation: create a new pData instance and initialize it.
 * Each process has only 1 such an instance.
 */
BEGINcreateInstance

CODESTARTcreateInstance
    ASSERT(pData != NULL);
	pData->batchSize = 0;
	
	pthread_mutex_init(&pData->mutBatch, NULL);
	pData->mdsd_sock = INVALID_SOCKET;
	pData->cacheTable = create_hashtable(pData->batchSize, hash_from_string, key_equals_string, NULL);

	pData->readerThread = 0;
	pData->readOK = 0;
	if (CreateReaderThread(pData) > 0)
	{
	   dbgprintf("omazuremds: createInstance error at CreateReaderThread\n");
	   iRet=RS_RET_ERR;
	}

ENDcreateInstance


/* Defines worker instance. Each rsyslog process has only 1 instance (InstanceData).
 * It may have multiple worker instances defined by multiple worker queues.
 * The batch log data need to be tracked in worker instance.
 *
 * The macros define pWrkrData, where WrkrData->pData = pData.
 */
BEGINcreateWrkrInstance
	int batchSize;

CODESTARTcreateWrkrInstance
	pWrkrData->batch.n = 0;
	batchSize = pData->batchSize;
	pWrkrData->batch.dataList = (char**)malloc(batchSize * sizeof(char*));
	snprintf(pWrkrData->tagBase, sizeof(pWrkrData->tagBase), "%d:", (int)time(0));
ENDcreateWrkrInstance
    

BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
	if(eFeat == sFEATURERepeatedMsgReduction)
		iRet = RS_RET_OK;
ENDisCompatibleWithFeature

/* free any resource allocated in createInstance. */
BEGINfreeInstance
CODESTARTfreeInstance
	pthread_mutex_destroy(&pData->mutBatch);
	pData->batchSize = 0;	
	
	hashtable_destroy(pData->cacheTable, 1);
	if (pData->readerThread > 0) {
		pthread_cancel(pData->readerThread);
		pData->readerThread = 0;
	}
	pData->readOK = 0;
ENDfreeInstance


/* Free the worker instance resources */
BEGINfreeWrkrInstance
CODESTARTfreeWrkrInstance
	free(pWrkrData->batch.dataList);
	pWrkrData->batch.dataList = NULL;
	pWrkrData->batch.n = 0;
ENDfreeWrkrInstance


BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
ENDdbgPrintInstInfo


/* Return 1 if str is NULL/empty/whitespaces. Otherwise, return 0.
 */
int IsEmptyOrWhiteSpace(const char * str)
{
	int isEmpty = 1;
	size_t i;

	if (str != NULL) {
		for(i = 0; i < strlen(str); i++) {
			if (!isspace(str[i])) {
				isEmpty = 0;
				break;
			}
		}
	}
	return isEmpty;
}


/* Create a new string that'll combine all strings in a given array, return the new string.
   The new string memory needs to be freed by caller.

   inputs:
     - strArray: string array.
     - nitems: number of items in string array.

   return: new combined string. It must be freed by caller.
   */
char* CreateNewString(char** strArray, int nitems)
{
	char* newStr = NULL;
	int totalLen = 0;
	int i = 0;
	int strIndex=0;
	size_t itemLen = 0;

	if (strArray == NULL || nitems <= 0) {
		return newStr;
	}
	for (i = 0; i < nitems; i++)
	{
		totalLen += strlen(strArray[i]);
	}

	newStr = (char*)malloc(totalLen+1);

	for (i = 0; i < nitems; i++)
	{
		itemLen = strlen(strArray[i]);
		memcpy(newStr+strIndex, strArray[i], itemLen);
		strIndex += itemLen;
	}
	newStr[totalLen]='\0';
	return newStr;
}


/* set up connection to MDSD agent socket.
 * There are seveval socket failure cases:
 *   - cannot do socket()/connect(): close socket and re-create.
 *   - send data to mdsd fails: close and recreate socket.
 *   - read mdsd data fails: no need to close socket/recreate.
 *
 * Return: error code.
 */
rsRetVal SetupConnectionWithMdsd(instanceData* pData)
{
	DEFiRet;
	struct sockaddr_in addr;
	int retries;
	int ret;
	char errorstr[256];
	char* errRC;
	const int NETWORK_RETRY=10;
	
	assert(pData != NULL);

	memset((char *) &addr, 0, sizeof(addr));
	addr.sin_family = AF_INET;
	addr.sin_port = htons(pData->mdsdPort);
	addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);

	pthread_mutex_lock(&pData->mutBatch);

	dbgprintf("omazuremds: setupConnectionWithMdsd socket: %d ...\n", pData->mdsd_sock);
#ifdef DEBUGMDS
    errmsg.LogError(0, RS_RET_NONE, "omazuremds: setupConnectionWithMdsd socket: %d ...\n", pData->mdsd_sock);
#endif

	if(pData->mdsd_sock == INVALID_SOCKET) {
		retries = 0;
		while(1) {
			dbgprintf("omazuremds: setupConnectionWithMdsd creating new socket fd=%d\n", pData->mdsd_sock);
			if((pData->mdsd_sock=socket(AF_INET, SOCK_STREAM , 0))==-1) {
				errRC = strerror_r(errno, errorstr, sizeof(errorstr));
				errmsg.LogError(0, RS_RET_ERR, "omazuremds error at socket(). errno=%s\n", errRC);
				ABORT_FINALIZE(RS_RET_SUSPENDED);
			}

			if((ret = connect(pData->mdsd_sock, (struct sockaddr*)&addr, sizeof(addr))) != 0) {
				if(retries++ == NETWORK_RETRY) {
					errRC = strerror_r(errno, errorstr, sizeof(errorstr));
					errmsg.LogError(0, RS_RET_ERR, "omazuremds error at connect(). errno=%s\n", errRC);
					ABORT_FINALIZE(RS_RET_SUSPENDED);
				} else {
					close(pData->mdsd_sock);
					pData->mdsd_sock = INVALID_SOCKET;
					usleep(100000); /* 100 ms = 100,000 us */
				}
			}
			else {
				break;
			}
		}
	}

finalize_it:
	if(iRet != RS_RET_OK) {
		close(pData->mdsd_sock);
		pData->mdsd_sock = INVALID_SOCKET;
	}
	dbgprintf("omazuremds: done with setupConnectionWithMdsd. iRet=%d\n", iRet);
#ifdef DEBUGMDS
    errmsg.LogError(0, RS_RET_NONE,"omazuremds: done with setupConnectionWithMdsd. iRet=%d\n", iRet);
#endif
	pthread_mutex_unlock(&pData->mutBatch);

	RETiRet;
}


/* Add k=tagStr, v=jsonStr to cache table.
 * Multiple workers may call this. Need to be thread safe.
 */
int AddDataToCache(instanceData* pData, char* tagStr, char* jsonStr)
{
	int nerrs = 0;
	int rc = 0;

	assert(pData != NULL);
	assert(tagStr != NULL);
	assert(jsonStr != NULL);

	dbgprintf("omazuremds AddDataToCache tag='%s'\n", tagStr);
#ifdef DEBUGMDS    
    errmsg.LogError(0, RS_RET_NONE, "omazuremds AddDataToCache tag='%s'\n", tagStr);
#endif
	pthread_mutex_lock(&pData->mutBatch);
	rc = hashtable_insert(pData->cacheTable, tagStr, jsonStr);
	pthread_mutex_unlock(&pData->mutBatch);

	if (!rc)
	{
        errmsg.LogError(0, RS_RET_ERR, "omazuremds AddDataToCache error: tag='%s', value='%s'\n", tagStr, jsonStr);
		nerrs++;
	}

	return nerrs;
}


/*
 * Remove entry matching a given TAG from the cache table. Free the memory of the tag and JSON object.
 */
void RemoveDataFromCache(instanceData* pData, char* tagStr)
{
	char* foundJson = NULL;

	assert(pData != NULL);
	assert(tagStr != NULL);
	if (IsEmptyOrWhiteSpace(tagStr)) {
		dbgprintf("omazuremds RemoveDataFromCache: error. unexpected tagStr: empty or white space\n");
		return;
	}

	pthread_mutex_lock(&pData->mutBatch);
	/* hashtable_remove() will free the found entry key*/
	foundJson = hashtable_remove(pData->cacheTable, tagStr);
	pthread_mutex_unlock(&pData->mutBatch);
	if (foundJson)
	{
		free(foundJson);
		foundJson = NULL;
	}
	else
	{
		/* For resent objects, they could be already removed. */
		dbgprintf("omazuremds RemoveDataFromCache: warning: object not found: tag='%s'; value=NULL\n", tagStr);
	}
}

/* Get number of items in the cache table. */
unsigned int GetCacheCount(instanceData* pData)
{
	unsigned int count = 0;
	assert(pData != NULL);
	pthread_mutex_lock(&pData->mutBatch);
	count = hashtable_count(pData->cacheTable);
	pthread_mutex_unlock(&pData->mutBatch);
	return count;
}

/*
 * Parse mdsd response string to get the tag\n. For each tag, remove it from the
 * cache and free the JSON object string memory.
 *
 * responseStr may contain incomplete data from mdsd. example: 'tag1\ntag2', where
 * there is no \n at the end of tag2. The incomplete data tag2 won't be processed.
 *
 * Return: number of bytes in responseStr that are processed.
 */
int ProcessMdsdResponse(instanceData* pData, const char* responseStr)
{
	const char delimiter = '\n';
	char delimiters[2] = {delimiter, '\0'};
	char *cp, *tag, *saveptr;
	char *partialData;
	size_t totalLen = 0;
	size_t dataLen = 0;

	assert(pData != NULL);
	assert(responseStr != NULL);

	totalLen = strlen(responseStr);
	dataLen = totalLen;

	if (responseStr[totalLen-1] != delimiter) {
		partialData = strrchr(responseStr, delimiter);
		if (partialData) {
			dataLen = partialData - responseStr + 1;
		}
		else {
			dataLen = 0;
		}
	}
	cp = (char*)malloc(dataLen+1);
	strncpy(cp, responseStr, dataLen);
	cp[dataLen] = '\0';

    tag = strtok_r(cp, delimiters, &saveptr);
	if (tag != NULL)
	{
		RemoveDataFromCache(pData, tag);
		while(1) {
            tag = strtok_r(NULL, delimiters, &saveptr);

            if (tag == NULL) {
                break;
			}

			RemoveDataFromCache(pData, tag);
	   }
	}
	free(cp);
	cp = NULL;
	return dataLen;
}


void SetReadOK(instanceData *pData, int newValue)
{
	assert(newValue == 1 || newValue == 0);
	pthread_mutex_lock(&pData->mutBatch);
	pData->readOK = newValue;
	pthread_mutex_unlock(&pData->mutBatch);
}

int ResendCacheData(instanceData* pData);

/* Read data from mdsd and validate the results.
 * If read() fails, return to caller. Otherwise, use a loop to continue reading from mdsd.
 * Once data are read, they are processed assumeing format TAG1\nTAG2\n.... Partial data are
 * string that doesn't end with '\n'. They are saved and will add up to next read().
 *
 * returns: number of errors.
 */
int ReadDataFromMdsdOnce(instanceData* pData)
{
	int n;
	int nerrs = 0;
	char* responseBuf;
	const int initBufLen = 1024;
	int trigger = initBufLen/2;
	int leftLen = initBufLen; /* left free space in buffer */
	int bufLen = initBufLen;
	responseBuf = (char*) malloc(bufLen+1);
	int index = 0;

	char errorstr[256];
	char* errRC;
	int nProcessed = 0;
	int nNotProcessed = 0;
    
    int resendTime = 30; /* in seconds. frequencey to do the resend*/
    struct timespec prevClock;
    struct timespec currClock;

	assert(pData != NULL);
    if (INVALID_SOCKET == pData->mdsd_sock) {
#ifdef DEBUGMDS
        errmsg.LogError(0, RS_RET_NONE, "omazuremds: ReadDataFromMdsdOnce. error. sock=%d\n", pData->mdsd_sock);
#endif
        dbgprintf("omazuremds: ReadDataFromMdsdOnce. error. sock=%d\n", pData->mdsd_sock);
        nerrs++;
        return nerrs;
    }       

    clock_gettime(CLOCK_MONOTONIC_RAW, &prevClock);

	while(1)
	{
#ifdef DEBUGMDS
        errmsg.LogError(0, RS_RET_NONE, "omazuremds: ReadDataFromMdsdOnce start to read()\n");
#endif
		n = read(pData->mdsd_sock, responseBuf+index, leftLen);
#ifdef DEBUGMDS
		errmsg.LogError(0, RS_RET_NONE, "omazuremds: ReadDataFromMdsdOnce read() return = %d\n", n);
#endif
		if (n == 0) {
			errRC=strerror_r(errno, errorstr, sizeof(errorstr));
			dbgprintf("omazuremds: ReadDataFromMdsdOnce: read 0 bytes. errno=%s\n", errRC);
			nerrs++;			
		}
		else if (n == -1)
		{
			errRC=strerror_r(errno, errorstr, sizeof(errorstr));
			errmsg.LogError(0, RS_RET_ERR, "omazuremds: ReadDataFromMdsdOnce: read() error. errno=%s\n", errRC);
			nerrs++;			
		}
        else {
            leftLen -=  n;
            index += n;
            responseBuf[bufLen-leftLen] = '\0';

            dbgprintf("omazuremds: ReadDataFromMdsdOnce: responseBuf='%s'\n", responseBuf);
            nProcessed = ProcessMdsdResponse(pData, responseBuf);
            if (nProcessed > 0) {
                /* handle partial TAG data from response */
                nNotProcessed = bufLen-leftLen - nProcessed;
                memmove(responseBuf, responseBuf+nProcessed, nNotProcessed+1); /* also move the '\0'.*/

                leftLen += nProcessed;
                index -= nProcessed;
            }

            if (leftLen <= trigger) {
                leftLen += initBufLen;
                bufLen += initBufLen;
                responseBuf = (char*)realloc(responseBuf, bufLen+1);
            }
        }

        /* check whether we need to do resend*/        
        clock_gettime(CLOCK_MONOTONIC_RAW, &currClock);
        if ((int)(currClock.tv_sec - prevClock.tv_sec) >= resendTime) {
            nerrs += ResendCacheData(pData);
            clock_gettime(CLOCK_MONOTONIC_RAW, &prevClock);
            if (nerrs > 0) {
                break;
            }
        }
	}

	free(responseBuf);
	responseBuf = NULL;
#ifdef DEBUGMDS
    errmsg.LogError(0, RS_RET_NONE, "omazuremds: ReadDataFromMdsdOnce. nerrs=%d\n", nerrs);
#endif
	dbgprintf("omazuremds: ReadDataFromMdsdOnce. nerrs=%d\n", nerrs);
	return nerrs;
}

/* Use socket send() to send string to mdsd port. Return error and close the socket
 * if send() fails.
 */
int SendDataToMdsd(instanceData* pData, char* jsonObjectListStr)
{
	int nerrs = 0;
	int sendRet = 0;
	int jsonListLen = strlen(jsonObjectListStr);

	assert(pData != NULL);
	assert(jsonObjectListStr != NULL);
#ifdef DEBUGMDS
    errmsg.LogError(0, RS_RET_NONE, "omazuremds: start SendDataToMdsd: %s ...\n", jsonObjectListStr);
#endif
	dbgprintf("omazuremds: start SendDataToMdsd: %s ...\n", jsonObjectListStr);
	if (SetupConnectionWithMdsd(pData) != RS_RET_OK)
	{
		nerrs++;
	}
	else {
		sendRet = send(pData->mdsd_sock, jsonObjectListStr, jsonListLen, 0);

		if (sendRet != jsonListLen) {
			errmsg.LogError(0, RS_RET_ERR, "omazuremds error at send() failed, sock=%d, ret=%d\n",
					pData->mdsd_sock, sendRet);
			nerrs++;
            
            pthread_mutex_lock(&pData->mutBatch);
			close(pData->mdsd_sock);
			pData->mdsd_sock = INVALID_SOCKET;
            pthread_mutex_unlock(&pData->mutBatch);
			
            /* There must be something wrong with socket, so set ReadOK=0.*/
			SetReadOK(pData, 0);
		}
		else {
			/* data were sent; reader may start read */
			SetReadOK(pData, 1);
		}
	}
#ifdef DEBUGMDS
    errmsg.LogError(0, RS_RET_NONE, "omazuremds: SendDataToMdsd done. nerrs=%d\n", nerrs);
#endif
	return nerrs;
}

/*
 * Resend any data in the cache table to mdsd; report error if any.
 * Return: number of errors.
 */
int ResendCacheData(instanceData* pData)
{
	int nerrs = 0;
	unsigned int count = 0;
	char **strArray = NULL;
	int index = 0;
	char * jsonObjectListStr = NULL;
	unsigned int j = 0;
	int batchSize = pData->batchSize;	
	unsigned int i = 0;
	struct entry * item = NULL;	

	assert(pData != NULL);
#ifdef DEBUGMDS
    errmsg.LogError(0, RS_RET_NONE, "ResendCacheData start\n");
#endif
	pthread_mutex_lock(&pData->mutBatch);

	count = pData->cacheTable->entrycount;

	if (count > 0) {
		strArray = (char**)malloc(count * sizeof(char*));

		for (i = 0; i < pData->cacheTable->tablelength; i++) {
			item = pData->cacheTable->table[i];
			while (NULL != item) {
				strArray[index++] = (char*) item->v;
				item = item->next;
			}
		}
	}
	pthread_mutex_unlock(&pData->mutBatch);

	for (j = 0; j < (count / batchSize); j++) {
		jsonObjectListStr = CreateNewString(strArray+j*batchSize, batchSize);
		nerrs += SendDataToMdsd(pData, jsonObjectListStr);
		free(jsonObjectListStr);
		jsonObjectListStr = NULL;
	}
	if (count%batchSize > 0)
	{
		jsonObjectListStr = CreateNewString(strArray+j*batchSize, count - j*batchSize);
		nerrs += SendDataToMdsd(pData, jsonObjectListStr);
		free(jsonObjectListStr);
		jsonObjectListStr = NULL;
	}

	free(strArray);
	strArray = NULL;

#ifdef DEBUGMDS
    errmsg.LogError(0, RS_RET_NONE, "ResendCacheData count=%d. nerrs=%d\n", count, nerrs);
#endif
	return nerrs;
}


/*
 * Read data from mdsd. It is called in a new thread.
 *
 * failure handling:
 *    - If cache is empty, which means no data are sent, mark readOk = 0;
 *    - If either resend failure or read fails, mark readOK = 0;
 *    - If there are previous failures and readOK = 1, cache may not be empty. Resend.
 *    - If readOK  0, wait until readOK=1 before read.
 */
void* ReadDataFromMdsd(void* vpData)
{
	int nSendErrs = 0;
	int nReadErrs = 0;
	instanceData* pData = (instanceData*)vpData;
	
	const int sleepMS = 100; /* 100 milli-seconds */
	int count = 0;
	const int MaxCount = 100;
    
	assert(vpData != NULL);

	while(1) {
		/* Don't block this forever because if no msg is sent(), no way to reset ReadOK. */
		count = 0;
#ifdef DEBUGMDS
        errmsg.LogError(0, RS_RET_NONE, "ReadDataFromMdsd readOK=%d\n", pData->readOK);
#endif
		while((0 == pData->readOK) && (count < MaxCount))
		{
			usleep(sleepMS*1000);
			count++;
		}
#ifdef DEBUGMDS
        errmsg.LogError(0, RS_RET_NONE, "ReadDataFromMdsd readOK=%d. count=%d\n", pData->readOK, count);
#endif
		nSendErrs = ResendCacheData(pData);

		if (nSendErrs > 0) {
            dbgprintf("omazuremds: ReadDataFromMdsd: resend error: nerrs=%d\n", nSendErrs);
			continue;
		}                

		nReadErrs = ReadDataFromMdsdOnce((instanceData*)pData);

		if (nReadErrs > 0) {
			dbgprintf("omazuremds: ReadDataFromMdsd: read error: nerrs=%d\n", nReadErrs);
			SetReadOK(pData, 0);			
		}
		else {
			/* some data is read. so it is good to continue */
			SetReadOK(pData, 1);
		}
		if (0 == GetCacheCount(pData)) {
			/* no data to read */
			SetReadOK(pData, 0);
			dbgprintf("omazuremds: ReadDataFromMdsd: cachetable is empty\n");
		}
	}

	pthread_exit(pData);
}

/* Create a new thread that'll listen on mdsd port and read data from it.
 * Because data can come from mdsd at any time, we'll keep the thread alive.
 */
int CreateReaderThread(instanceData* pData)
{
	int nerrs = 0;
	pthread_attr_t attr;
	int rc;

	assert(pData != NULL);

	if (pData->readerThread > 0)
	{
		dbgprintf("omazuremds: CreateReaderThread: already created: id=%ld\n", pData->readerThread);
		return nerrs;
	}

	pthread_attr_init(&attr);
	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    
     pthread_attr_setstacksize(&attr, 4096*1024);

	rc = pthread_create(&pData->readerThread, &attr, ReadDataFromMdsd, (void *)pData);
	if (rc)
	{
		nerrs++;
		dbgprintf("omazuremds: CreateReaderThread error. error=%d\n", rc);
		pData->readerThread = 0;
	}
	pthread_attr_destroy(&attr);

	dbgprintf("omazuremds: CreateReaderThread done. nerrs=%d\n", nerrs);
	return nerrs;
}

/* Send worker queue's batch to mdsd.
 * Inputs:
 *   pWrkrData: worker instance.
 *   comment: some comment for logging purpose only.
 * Returns: number of errors.
 */
int SendBatchData(wrkrInstanceData_t *pWrkrData, const char* comment)
{
	instanceData *pData;
	char* jsonObjectListStr;
	int nerrs = 0;

	assert(pWrkrData != NULL);
	assert(comment != NULL);

	if (pWrkrData->batch.n == 0) {
		return nerrs;
	}

	pData = pWrkrData->pData;
	dbgprintf("omazuremds: sendBatchData %s BatchSize=%d; #Items=%d\n",
			  comment, pData->batchSize, pWrkrData->batch.n);

	jsonObjectListStr = CreateNewString(pWrkrData->batch.dataList, pWrkrData->batch.n);

	nerrs += SendDataToMdsd(pData, jsonObjectListStr);

	free(jsonObjectListStr);
	jsonObjectListStr = NULL;

	/* mark worker batch data to be empty */
	pWrkrData->batch.n = 0;

	dbgprintf("omazuremds: finished sendBatchData. nerrs=%d\n", nerrs);
	return nerrs;
}

/* expected valid source format: "<source>". If <source> is empty, or not double-quoted, report error.
   Return 1 if valid. 0 is  invalid.
*/
int IsValidSourceFormat(const char* src)
{   
    int len = 0;
    int i = 0;
    int isValid = 1;
    assert(src != NULL);
    
    if (IsEmptyOrWhiteSpace(src)) {
        dbgprintf("omazuremds: CreateMdsdJson error: unexpected source value: empty or white space\n");
        return 0;
    }
    len = strlen(src);
    if ('"' != src[0] || '"' != src[len-1]) {
        dbgprintf("omazuremds: CreateMdsdJson error: unexpected source value: '%s'\n", src);
        return 0;
    }
    
    for (i = 1; i < len-1; i++) {
        if ('"' == src[i]) {
            dbgprintf("omazuremds: CreateMdsdJson error: unexpected source value: '%s'\n", src);
            isValid = 0;
            break;
        }
    }
    return isValid;        
}


/* Create a unique TAG value given a string base.
 * It is used in the JSON object, which must have unique TAG value.
 * CLOCK_MONOTONIC_RAW will get clock time since computer reboots. To make tag really unique,
 * add the tagBase, which is the Epoch time.
 *
 * Inputs:
 *   tagBase: some base string to contruct unique value.
 *
 * Returns: a unique string that can be used as TAG value.
 */
char * CreateUniqueTag(char * tagBase)
{
	char *tagStr;
	struct timespec clock;

	assert(tagBase != NULL);
	clock_gettime(CLOCK_MONOTONIC_RAW, &clock);
	tagStr = (char*)malloc(strlen(tagBase)+32); /* allocate large enough, extra space for added numbers. */
	sprintf(tagStr, "%s%d.%ld", tagBase, (int)clock.tv_sec, clock.tv_nsec);

	return tagStr;
}

/* Create a json object given the message log string;
 * The logString format is facility,<others>
 * Facility will be used as the source.
 * {"TAG":"<tag>", "SOURCE":"<source>", "DATA":["a","b"]}
 *
 * Input:
 *   tagStr: JSON object tag.
 *   logString: log string that'll be used in JSON's DATA part.
 * Returns: a new JSON object string. Its memory needs to be freed by caller.
 */
char* CreateMdsdJson(char* tagStr, char* logString)
{
	int i;
	int sourceLen;
	int totalLen;
	char * source;
	size_t jsonLen;
	char *jsonString;

	/* if no source found in logString, use default.*/
	const char* defaultSrc = "\"local0\"";
	const char jsonFormat[] = "{\"TAG\":\"%s\", \"SOURCE\":%s,\"DATA\":[%s]}";

	/* need to handle logString without comma scenario. so make a new string for it.*/
	char *logStringNew = NULL;
	int useDefaultSrc = 0;

	assert(tagStr != NULL);
	assert(logString != NULL);

	if (IsEmptyOrWhiteSpace(tagStr)) {
		errmsg.LogError(0, RS_RET_ERR, "omazuremds: CreateMdsdJson error. unexpected tagStr value: empty or white space\n");
		return NULL;
	}
	if (IsEmptyOrWhiteSpace(logString)) {
		dbgprintf("omazuremds: CreateMdsdJson error: unexpected logString value: empty or white space\n");
		return NULL;
	}

	sourceLen = 0;
	totalLen = strlen(logString);
	for (i = 0; i < totalLen; i++)
	{
		if (logString[i] == ',') {
			sourceLen = i;
			break;
		}
	}
	if (sourceLen > 0) {
		source = (char*)malloc(sourceLen+1);
		strncpy(source, logString, sourceLen);
		source[sourceLen] = '\0';
        
        if (0 == IsValidSourceFormat(source)) {
            return NULL;
        }        
	}
	else {
		useDefaultSrc = 1;
		sourceLen = strlen(defaultSrc);
		source = (char*)malloc(sourceLen+1);
		strncpy(source, defaultSrc, sourceLen);
		source[sourceLen] = '\0';

		logStringNew = (char*)malloc(strlen(logString) + sourceLen + 8);
		sprintf(logStringNew, "%s,\"%s\"", source, logString);
	}

	/* give extra space to make sure we have enough space for the jason string */
	jsonLen = strlen(tagStr) + sourceLen + totalLen + sizeof(jsonFormat) + 64;
	jsonString = (char*)malloc(jsonLen * sizeof(char));

	if (useDefaultSrc) {
		sprintf(jsonString, jsonFormat, tagStr, source, logStringNew);
		free(logStringNew);
		logStringNew = NULL;
	}
	else {
		sprintf(jsonString, jsonFormat, tagStr, source, logString);
	}

	free(source);
	source = NULL;

	return jsonString;
}


/* Check whether MDS agent can be connected to properly.
 * May add ping msg checking in the future.
 */
rsRetVal InitMdsdConnection(wrkrInstanceData_t *pWrkrData)
{
    DEFiRet;
    CHKiRet(SetupConnectionWithMdsd(pWrkrData->pData));

finalize_it:
    RETiRet;
}

/* If beginTransaction/doAction/endTransaction is suspended (RS_RET_SUSPENDED),
 * resume will be called until it succeeds, or exit based on action configurations.
 * After resume succeeds, beginTransaction will start.
 */
BEGINtryResume
CODESTARTtryResume
dbgprintf("omazuremds: resume action\n");
CHKiRet(InitMdsdConnection(pWrkrData));
finalize_it:
ENDtryResume


/* Begin a batch loop: beginTransaction/DoAction/endTransaction;
 * If previous batch's send data fails, or something aborts before endTransaction,
 * use beginTransaction to retry sending them. If the retry in beginTransaction fails,
 * suspend the transaction.
 *
 */
BEGINbeginTransaction
CODESTARTbeginTransaction    
    dbgprintf("omazuremds: beginTransaction starts.\n");
    CHKiRet(InitMdsdConnection(pWrkrData));

finalize_it:
ENDbeginTransaction


/* This step handles the input message.
 * Each message will be saved into a data structure. When all messages for a batch
 * are saved, the messages will be sent to mdsd in endTransaction.
 *
 */
BEGINdoAction	
	char *logString;
    instanceData *pData;
    char *tagStr;
    char *jsonStr;

CODESTARTdoAction
	if (ppString != NULL) {
		pData = pWrkrData->pData;
		logString =  (char*)ppString[0];
		ASSERT(logString != NULL);
		dbgprintf("omazuremds: start doAction: msg='%s'\n", logString);

		if (pWrkrData->batch.n >= pData->batchSize) {
			errmsg.LogError(0, RS_RET_ERR, "omazuremds: unexpected error: out of space for batch. n=%d, batchSize=%d\n",
					pWrkrData->batch.n, pData->batchSize);
		}
		else {
			tagStr = CreateUniqueTag(pWrkrData->tagBase);
			jsonStr = CreateMdsdJson(tagStr, logString);

			if (jsonStr != NULL) {
				pWrkrData->batch.dataList[pWrkrData->batch.n] = jsonStr;
				pWrkrData->batch.n++;

				if (AddDataToCache(pData, tagStr, jsonStr) > 0) {
					iRet = RS_RET_SUSPENDED;
				}
			}
		}

		if (iRet == RS_RET_OK) {
			iRet = RS_RET_DEFER_COMMIT;
		}
	}
    dbgprintf("omazuremds: done doAction: iRet=%d\n", iRet);
ENDdoAction


/* This is called after a batch of messages are processed by doAction().
 * It will send data to mdsd. If send() fails, suspend the operation until resume succeeds.
 */
BEGINendTransaction
    int nerrs = 0;
CODESTARTendTransaction
	nerrs += SendBatchData(pWrkrData, "endTransaction");
    if (nerrs > 0) {
    	dbgprintf("omazuremds: endTransaction sendBatchData %d errors.\n", nerrs);
    	iRet = RS_RET_SUSPENDED;
    }
ENDendTransaction


/* Defines actions that'll be called for each new instance.
 * This function is called once only. It will get the rsyslog conf parameters
 * and also create the instanceData object.
*/
BEGINnewActInst
	struct cnfparamvals *pvals;
	int i;
    char* batchSizeStr;
	char* mdsdPortStr;
	char* tplName;
CODESTARTnewActInst	
	if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
		ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
	}

	CHKiRet(createInstance(&pData));
	CODE_STD_STRING_REQUESTparseSelectorAct(1)

	for(i = 0 ; i < actpblk.nParams ; ++i) {
		if(!pvals[i].bUsed)
			continue;

        if(strcmp(actpblk.descr[i].name, "template") == 0) {
			tplName = es_str2cstr(pvals[i].val.d.estr, NULL);
			dbgprintf("omazuremds: newact templateName = '%s'\n", tplName);
        }
		else if (strcmp(actpblk.descr[i].name, "queue.dequeuebatchsize") == 0)
		{
            batchSizeStr = es_str2cstr(pvals[i].val.d.estr, NULL);
            dbgprintf("omazuremds: newact batch size = '%s'\n", batchSizeStr);

            if (batchSizeStr != NULL) {
            	pData->batchSize = atoi(batchSizeStr);
            	free(batchSizeStr);
            	batchSizeStr = NULL;
            }
		}
		
		else if (strcmp(actpblk.descr[i].name, "mdsdport") == 0)
		{
			mdsdPortStr = es_str2cstr(pvals[i].val.d.estr, NULL);
			if (mdsdPortStr != NULL)
			{
				pData->mdsdPort = atoi(mdsdPortStr);
				free(mdsdPortStr);
				mdsdPortStr = NULL;
			}
		}
		else {
			dbgprintf("omazuremds: program error, non-handled "
			  "param '%s'\n", actpblk.descr[i].name);
		}
    }

	if (pData->batchSize <= 0)
	{
		dbgprintf("omazuremds: action requires queue.dequeuebatchsize");
		ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
	}

    if(tplName == NULL) {
		dbgprintf("omazuremds: action requires a template name");
		ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
	}

	/* template string 0 is just a regular string */
	OMSRsetEntry(*ppOMSR, 0, (uchar*)tplName, OMSR_NO_RQD_TPL_OPTS);

CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
dbgprintf("omazuremds: ENDnewActInst\n");
ENDnewActInst


/* This will parse rsyslog conf file. It will check
 * for legacy formats. Because legacy formats are not supported,
 * it will report errors for legacy formats.
 */
BEGINparseSelectorAct
CODESTARTparseSelectorAct
CODE_STD_STRING_REQUESTparseSelectorAct(1)
	if(!strncmp((char*) p, ":omazuremds:", sizeof(":omazuremds:") - 1)) {
		errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
			"omazuremds supports only v6 config format, use: "
			"action(type=\"omazuremds\" ...)");
	}
	ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct


BEGINmodExit
CODESTARTmodExit
ENDmodExit

/* These defines module level parameters and configurations */
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES    /* for new config syntax */
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* To support the transactional interface! */
ENDqueryEtryPt


/* Reset config variables for this module to default values.
 */
static rsRetVal resetConfigVariables(uchar __attribute__((unused)) *pp, void __attribute__((unused)) *pVal)
{
	DEFiRet;
	RETiRet;
}

BEGINmodInit()   
CODESTARTmodInit
INITLegCnfVars
	*ipIFVersProvided = CURR_MOD_IF_VERSION; 
CODEmodInit_QueryRegCFSLineHdlr

    CHKiRet(objUse(errmsg, CORE_COMPONENT));
	INITChkCoreFeature(bCoreSupportsBatching, CORE_FEATURE_BATCHING);
	if(!bCoreSupportsBatching) {	
		errmsg.LogError(0, NO_ERRCODE, "omazuremds: batching is not supported. rsyslog core too old.");
		ABORT_FINALIZE(RS_RET_ERR);
	}

	
	CHKiRet(omsdRegCFSLineHdlr((uchar *)"resetconfigvariables", 1, eCmdHdlrCustomHandler,
				    resetConfigVariables, NULL, STD_LOADABLE_MODULE_ID));
ENDmodInit

Attachment: etc.tar
Description: Unix tar archive

_______________________________________________
rsyslog mailing list
http://lists.adiscon.net/mailman/listinfo/rsyslog
http://www.rsyslog.com/professional-services/
What's up with rsyslog? Follow https://twitter.com/rgerhards
NOTE WELL: This is a PUBLIC mailing list, posts are ARCHIVED by a myriad of 
sites beyond our control. PLEASE UNSUBSCRIBE and DO NOT POST if you DON'T LIKE 
THAT.

Reply via email to