Hi All,

I am trying to implement cache write using plugin cache.

After the null transfrom has made a cont call for event
TS_EVENT_VCONN_WRITE_COMPLETE, I try to
do cache write.
In order to handle these cases I create a queue of cache data. I check if
the
queue has data in a infinite running thread(threadcacheWrite), created
using TSThreadCreate. If It has, I create a cont for cache write
and perform cache write(TSCacheWrite).
After the cache write is complete I close the VConn.
However the the closed vconn instead of calling the same
function(cacheHandler), it calls null_transform function.
This function is again called when the null_transform vconn is also closed.
Also the data passed to null_transform function is same in both the cases.

Can anybody suggest me why the cache vconn is going to null transform
handler.
I checked this by using function (TSVConnClosedGet) and printing cont data.
The cont data printed is same in both the cases.
Due to this I am unable to free the Cont and it is resulting in memory
leaks.

I have also attached sample code in two files for the reference.

Am I missing something

Regards
static void handle_transform(TSCont f_ptrContp, string* correlationId)
{
			l_ptrCacheData->cache_data->key = (TSCacheKey)cacheKeyCreate(recordingId, correlationId);
			l_ptrCacheData->cache_data->status = CACHE_STATUS_NONE;
			l_ptrCacheData->cache_data->correlationId=correlationId;
			int l_iRecordingId=atoi(recordingId.c_str());
			long l_iMaxAge=0;
			if(getMaxAge(l_ptrCacheData->cache_data->txn, l_iMaxAge, correlationId) != 0)
				std::cerr<<"Unable to retrieve max age from server response";
			string l_TimeFieldName="&time=";
			string l_MaxAgeFieldName="&max-age=";
			char l_cMaxAge[20];
			char l_currTime[20];
			time_t l_lCurrentTimeSeconds = time(NULL);
			sprintf(l_cMaxAge,"%ld", l_iMaxAge);
			sprintf(l_currTime, "%ld", l_lCurrentTimeSeconds);
			l_ptrCacheData->cache_data->bufferData->append(l_TimeFieldName);
			l_ptrCacheData->cache_data->bufferData->append(l_currTime);
			l_ptrCacheData->cache_data->bufferData->append(l_MaxAgeFieldName);
			l_ptrCacheData->cache_data->bufferData->append(l_cMaxAge);
			l_ptrCacheData->cache_data->status = CACHE_STATUS_NONE;
			pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
			pthread_mutex_t cond_mutex = PTHREAD_MUTEX_INITIALIZER;
			l_ptrCacheData->cache_data->cond = &cond;
			l_ptrCacheData->cache_data->cond_mutex = &cond_mutex;

			l_ptrCacheData->cache_data->status = CACHE_STATUS_NONE;
			pthread_mutex_lock(l_ptrCacheData->cache_data->cond_mutex); 
			pthread_mutex_lock(&g_writeQueueLock);
			getWriteCacheQueue(l_iRecordingId)->push(l_ptrCacheData->cache_data);
			pthread_mutex_unlock(&g_writeQueueLock);

			if(l_ptrCacheData->cache_data->status == CACHE_STATUS_NONE || l_ptrCacheData->cache_data->status == CACHE_STATUS_OPEN_WRITE)
			{
				pthread_cond_wait(l_ptrCacheData->cache_data->cond, l_ptrCacheData->cache_data->cond_mutex); // Wait for Cache Key to Destroy
			}
			pthread_mutex_unlock(l_ptrCacheData->cache_data->cond_mutex);

			pthread_mutex_destroy(l_ptrCacheData->cache_data->cond_mutex);
			pthread_cond_destroy(l_ptrCacheData->cache_data->cond);
			TSContDestroy(l_ptrCacheData->cache_data->cont);
			TSCacheKeyDestroy(l_ptrCacheData->cache_data->key);
		}
}

int null_transform(TSCont f_ptrContp, TSEvent f_ptrEvent, void *f_vptrData)
{
	string* correlationId;
	Logger::getInstance()->writeAppLog(LOG_INFO, "Enter %s", __FUNCTION__);
	TSHttpTxn l_txnp = (TSHttpTxn) f_vptrData;
	CacheTransformationData *l_ptrCacheData;
	Log("null transform transaction object is %p", l_txnp);
	Log("null transform cont data is %p", TSContDataGet(f_ptrContp));
	Log("null transform cont is %p", f_ptrContp);
	/* Check to see if the transformation has been closed by a call to
	* TSVConnClose.
	*/
	l_ptrCacheData = (CacheTransformationData *)TSContDataGet(f_ptrContp);
	correlationId = l_ptrCacheData->correlationId;
	if (TSVConnClosedGet(f_ptrContp))
	{
		Log("CORRELATION_ID %s, VConn is closed.. Destroying cont data %p", correlationId->c_str(), TSContDataGet(f_ptrContp));
		TransformDataDestroy((CacheTransformationData *) TSContDataGet(f_ptrContp), correlationId);
		//TSContDestroy(f_ptrContp);
		//TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
		return 0;
	}
	else
	{
		switch (f_ptrEvent)
		{
			case TS_EVENT_ERROR:
			{
				TSVIO l_ptrInputVio;
				l_ptrInputVio = TSVConnWriteVIOGet(f_ptrContp);
				TSContCall(TSVIOContGet(l_ptrInputVio), TS_EVENT_ERROR, l_ptrInputVio);
			}
			break;
			case TS_EVENT_VCONN_WRITE_COMPLETE:
				TSVConnClose(TSTransformOutputVConnGet(f_ptrContp));
				break;

			case TS_EVENT_VCONN_WRITE_READY:
			case TS_EVENT_IMMEDIATE:
				handle_transform(f_ptrContp, correlationId);
				break;
			default:
				TSHttpTxnReenable(l_txnp, TS_EVENT_HTTP_CONTINUE);
				break;
		}
	}
return 0;
}
int cacheHandler(TSCont f_contp, TSEvent f_event, void *f_pEdata)
{
	CacheData* l_pCacheData=NULL;
	string* correlationId=NULL;
	l_pCacheData=(CacheData*)TSContDataGet(f_contp);
	correlationId=l_pCacheData->correlationId;

	switch(f_event){
		case TS_EVENT_CACHE_OPEN_WRITE:
			l_pCacheData->cacheVc=(TSVConn)f_pEdata;
			l_pCacheData->cacheWriteBuffer=TSIOBufferCreate();
			if(TSIOBufferWrite(l_pCacheData->cacheWriteBuffer, l_pCacheData->bufferData->c_str(), strlen(l_pCacheData->bufferData->c_str())) == TS_ERROR)
			{
				return TS_ERROR;
			}
			l_pCacheData->cacheWriteBufferReader=TSIOBufferReaderAlloc(l_pCacheData->cacheWriteBuffer);
			l_pCacheData->cacheWriteVio=TSVConnWrite(l_pCacheData->cacheVc, f_contp, l_pCacheData->cacheWriteBufferReader, strlen(l_pCacheData->bufferData->c_str()));
			break;
		case TS_EVENT_VCONN_WRITE_COMPLETE:
			pthread_mutex_lock(l_pCacheData->cond_mutex);
			l_pCacheData->status = CACHE_STATUS_VCONN_WRITE_COMPLETE;
			TSIOBufferReaderFree(l_pCacheData->cacheWriteBufferReader);
			TSIOBufferDestroy(l_pCacheData->cacheWriteBuffer);
			l_pCacheData->cacheWriteVio=NULL;
			TSVConnClose(l_pCacheData->cacheVc);
			l_pCacheData->cacheVc = NULL;
			pthread_mutex_unlock(l_pCacheData->cond_mutex);
			pthread_cond_broadcast(l_pCacheData->cond);
			break;
		default:
			Log("CORRELATION_ID: %s, %s : In Default", correlationId->c_str(), __FUNCTION__);
			break;
		}
	Log("CORRELATION_ID: %s, Exit %s", correlationId->c_str(), __FUNCTION__);
	return TS_SUCCESS;
}

void* threadcacheWrite(void *arg)
{
	int l_iQueueNo=*((int*)arg);
	queue<CacheData*>* l_pWriteQueue = getWriteCacheQueue(l_iQueueNo); 
	while(1)
	{
		pthread_mutex_lock(&g_writeQueueLock);
		if(!l_pWriteQueue->empty())
		{
			CacheData* cacheData = l_pWriteQueue->front();
			l_pWriteQueue->pop();
			TSCont contp = TSContCreate(cacheHandler, cacheData->mutex);
			cacheData->cont=contp;	
			TSContDataSet(contp, cacheData);
			TSCacheWrite(contp, cacheData->key);
		}
		pthread_mutex_unlock(&g_writeQueueLock); 
	};
	return NULL;	
}

Reply via email to