Hi All,
I am trying to implement cache write using plugin cache.
After the null transfrom has made a cont call for event
TS_EVENT_VCONN_WRITE_COMPLETE, I try to
do cache write.
In order to handle these cases I create a queue of cache data. I check if
the
queue has data in a infinite running thread(threadcacheWrite), created
using TSThreadCreate. If It has, I create a cont for cache write
and perform cache write(TSCacheWrite).
After the cache write is complete I close the VConn.
However the the closed vconn instead of calling the same
function(cacheHandler), it calls null_transform function.
This function is again called when the null_transform vconn is also closed.
Also the data passed to null_transform function is same in both the cases.
Can anybody suggest me why the cache vconn is going to null transform
handler.
I checked this by using function (TSVConnClosedGet) and printing cont data.
The cont data printed is same in both the cases.
Due to this I am unable to free the Cont and it is resulting in memory
leaks.
I have also attached sample code in two files for the reference.
Am I missing something
Regards
static void handle_transform(TSCont f_ptrContp, string* correlationId)
{
l_ptrCacheData->cache_data->key = (TSCacheKey)cacheKeyCreate(recordingId, correlationId);
l_ptrCacheData->cache_data->status = CACHE_STATUS_NONE;
l_ptrCacheData->cache_data->correlationId=correlationId;
int l_iRecordingId=atoi(recordingId.c_str());
long l_iMaxAge=0;
if(getMaxAge(l_ptrCacheData->cache_data->txn, l_iMaxAge, correlationId) != 0)
std::cerr<<"Unable to retrieve max age from server response";
string l_TimeFieldName="&time=";
string l_MaxAgeFieldName="&max-age=";
char l_cMaxAge[20];
char l_currTime[20];
time_t l_lCurrentTimeSeconds = time(NULL);
sprintf(l_cMaxAge,"%ld", l_iMaxAge);
sprintf(l_currTime, "%ld", l_lCurrentTimeSeconds);
l_ptrCacheData->cache_data->bufferData->append(l_TimeFieldName);
l_ptrCacheData->cache_data->bufferData->append(l_currTime);
l_ptrCacheData->cache_data->bufferData->append(l_MaxAgeFieldName);
l_ptrCacheData->cache_data->bufferData->append(l_cMaxAge);
l_ptrCacheData->cache_data->status = CACHE_STATUS_NONE;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t cond_mutex = PTHREAD_MUTEX_INITIALIZER;
l_ptrCacheData->cache_data->cond = &cond;
l_ptrCacheData->cache_data->cond_mutex = &cond_mutex;
l_ptrCacheData->cache_data->status = CACHE_STATUS_NONE;
pthread_mutex_lock(l_ptrCacheData->cache_data->cond_mutex);
pthread_mutex_lock(&g_writeQueueLock);
getWriteCacheQueue(l_iRecordingId)->push(l_ptrCacheData->cache_data);
pthread_mutex_unlock(&g_writeQueueLock);
if(l_ptrCacheData->cache_data->status == CACHE_STATUS_NONE || l_ptrCacheData->cache_data->status == CACHE_STATUS_OPEN_WRITE)
{
pthread_cond_wait(l_ptrCacheData->cache_data->cond, l_ptrCacheData->cache_data->cond_mutex); // Wait for Cache Key to Destroy
}
pthread_mutex_unlock(l_ptrCacheData->cache_data->cond_mutex);
pthread_mutex_destroy(l_ptrCacheData->cache_data->cond_mutex);
pthread_cond_destroy(l_ptrCacheData->cache_data->cond);
TSContDestroy(l_ptrCacheData->cache_data->cont);
TSCacheKeyDestroy(l_ptrCacheData->cache_data->key);
}
}
int null_transform(TSCont f_ptrContp, TSEvent f_ptrEvent, void *f_vptrData)
{
string* correlationId;
Logger::getInstance()->writeAppLog(LOG_INFO, "Enter %s", __FUNCTION__);
TSHttpTxn l_txnp = (TSHttpTxn) f_vptrData;
CacheTransformationData *l_ptrCacheData;
Log("null transform transaction object is %p", l_txnp);
Log("null transform cont data is %p", TSContDataGet(f_ptrContp));
Log("null transform cont is %p", f_ptrContp);
/* Check to see if the transformation has been closed by a call to
* TSVConnClose.
*/
l_ptrCacheData = (CacheTransformationData *)TSContDataGet(f_ptrContp);
correlationId = l_ptrCacheData->correlationId;
if (TSVConnClosedGet(f_ptrContp))
{
Log("CORRELATION_ID %s, VConn is closed.. Destroying cont data %p", correlationId->c_str(), TSContDataGet(f_ptrContp));
TransformDataDestroy((CacheTransformationData *) TSContDataGet(f_ptrContp), correlationId);
//TSContDestroy(f_ptrContp);
//TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
return 0;
}
else
{
switch (f_ptrEvent)
{
case TS_EVENT_ERROR:
{
TSVIO l_ptrInputVio;
l_ptrInputVio = TSVConnWriteVIOGet(f_ptrContp);
TSContCall(TSVIOContGet(l_ptrInputVio), TS_EVENT_ERROR, l_ptrInputVio);
}
break;
case TS_EVENT_VCONN_WRITE_COMPLETE:
TSVConnClose(TSTransformOutputVConnGet(f_ptrContp));
break;
case TS_EVENT_VCONN_WRITE_READY:
case TS_EVENT_IMMEDIATE:
handle_transform(f_ptrContp, correlationId);
break;
default:
TSHttpTxnReenable(l_txnp, TS_EVENT_HTTP_CONTINUE);
break;
}
}
return 0;
}
int cacheHandler(TSCont f_contp, TSEvent f_event, void *f_pEdata)
{
CacheData* l_pCacheData=NULL;
string* correlationId=NULL;
l_pCacheData=(CacheData*)TSContDataGet(f_contp);
correlationId=l_pCacheData->correlationId;
switch(f_event){
case TS_EVENT_CACHE_OPEN_WRITE:
l_pCacheData->cacheVc=(TSVConn)f_pEdata;
l_pCacheData->cacheWriteBuffer=TSIOBufferCreate();
if(TSIOBufferWrite(l_pCacheData->cacheWriteBuffer, l_pCacheData->bufferData->c_str(), strlen(l_pCacheData->bufferData->c_str())) == TS_ERROR)
{
return TS_ERROR;
}
l_pCacheData->cacheWriteBufferReader=TSIOBufferReaderAlloc(l_pCacheData->cacheWriteBuffer);
l_pCacheData->cacheWriteVio=TSVConnWrite(l_pCacheData->cacheVc, f_contp, l_pCacheData->cacheWriteBufferReader, strlen(l_pCacheData->bufferData->c_str()));
break;
case TS_EVENT_VCONN_WRITE_COMPLETE:
pthread_mutex_lock(l_pCacheData->cond_mutex);
l_pCacheData->status = CACHE_STATUS_VCONN_WRITE_COMPLETE;
TSIOBufferReaderFree(l_pCacheData->cacheWriteBufferReader);
TSIOBufferDestroy(l_pCacheData->cacheWriteBuffer);
l_pCacheData->cacheWriteVio=NULL;
TSVConnClose(l_pCacheData->cacheVc);
l_pCacheData->cacheVc = NULL;
pthread_mutex_unlock(l_pCacheData->cond_mutex);
pthread_cond_broadcast(l_pCacheData->cond);
break;
default:
Log("CORRELATION_ID: %s, %s : In Default", correlationId->c_str(), __FUNCTION__);
break;
}
Log("CORRELATION_ID: %s, Exit %s", correlationId->c_str(), __FUNCTION__);
return TS_SUCCESS;
}
void* threadcacheWrite(void *arg)
{
int l_iQueueNo=*((int*)arg);
queue<CacheData*>* l_pWriteQueue = getWriteCacheQueue(l_iQueueNo);
while(1)
{
pthread_mutex_lock(&g_writeQueueLock);
if(!l_pWriteQueue->empty())
{
CacheData* cacheData = l_pWriteQueue->front();
l_pWriteQueue->pop();
TSCont contp = TSContCreate(cacheHandler, cacheData->mutex);
cacheData->cont=contp;
TSContDataSet(contp, cacheData);
TSCacheWrite(contp, cacheData->key);
}
pthread_mutex_unlock(&g_writeQueueLock);
};
return NULL;
}