osaf/services/saf/logsv/lgs/lgs_evt.c | 8 +-
osaf/services/saf/logsv/lgs/lgs_file.c | 199 ++++++++++++++++++++++--------
osaf/services/saf/logsv/lgs/lgs_stream.c | 16 ++-
3 files changed, 161 insertions(+), 62 deletions(-)
- Remove unnecessary data copying in log_file_api() and file_hndl_thread()
- Return SA_AIS_ERR_TIMEOUT if the write operation time out when a
log record shall be written. If the file thread is already "hanging"
when a write is requested no attempt to write is made and SA_AIS_ERR_TRY_AGAIN
is returned as before.
- Try to recover file thread by recreating it if it hangs for a long time.
- Recover if bad file descriptor or stale NFS handle.
diff --git a/osaf/services/saf/logsv/lgs/lgs_evt.c
b/osaf/services/saf/logsv/lgs/lgs_evt.c
--- a/osaf/services/saf/logsv/lgs/lgs_evt.c
+++ b/osaf/services/saf/logsv/lgs/lgs_evt.c
@@ -1023,7 +1023,7 @@ static uint32_t proc_write_log_async_msg
SaAisErrorT error = SA_AIS_OK;
SaStringT logOutputString = NULL;
SaUint32T buf_size;
- int n;
+ int n, rc;
TRACE_ENTER2("client_id %u, stream ID %u", param->client_id,
param->lstr_id);
@@ -1064,9 +1064,13 @@ static uint32_t proc_write_log_async_msg
goto done;
}
- if (log_stream_write_h(stream, logOutputString, n) == -1) {
+ rc = log_stream_write_h(stream, logOutputString, n);
+ if (rc == -1) {
error = SA_AIS_ERR_TRY_AGAIN;
goto done;
+ } else if (rc == -2) {
+ error = SA_AIS_ERR_TIMEOUT;
+ goto done;
}
/*
diff --git a/osaf/services/saf/logsv/lgs/lgs_file.c
b/osaf/services/saf/logsv/lgs/lgs_file.c
--- a/osaf/services/saf/logsv/lgs/lgs_file.c
+++ b/osaf/services/saf/logsv/lgs/lgs_file.c
@@ -34,7 +34,10 @@
#include "osaf_utility.h"
+/* Max time to wait for file thread to finish */
#define MAX_WAITTIME_ms 500 /* ms */
+/* Max time to wait for hanging file thread before recovery */
+#define MAX_RECOVERYTIME_s 600 /* s */
#define GETTIME(x) osafassert(clock_gettime(CLOCK_REALTIME, &x) == 0);
static pthread_mutex_t lgs_ftcom_mutex; /* For locking communication */
@@ -47,12 +50,14 @@ struct file_communicate {
bool timeout_f; /* True if API has got a timeout. Thread shall not
answer */
lgsf_treq_t request_code; /* Request code from API */
int return_code; /* Return code from handlers */
- size_t indata_size;
- void *indata; /* In-parameters for handlers */
+ void *indata_ptr; /* In-parameters for handlers */
size_t outdata_size;
- void *outdata; /* Out data from handlers */
+ void *outdata_ptr; /* Out data from handlers */
};
+/* Used for synchronizing and transfer of data ownership between main thread
+ * and file thread.
+ */
static struct file_communicate lgs_com_data = {
.answer_f = false,
.request_f = false,
@@ -60,10 +65,17 @@ static struct file_communicate lgs_com_d
.return_code = LGSF_NORETC
};
+static pthread_t file_thread_id;
+static struct timespec ftr_start_time; /* Start time used for file thread
recovery */
+static bool ftr_started_flag = false; /* Set to true if thread is hanging */
+
/*****************************************************************************
* Utility functions
*****************************************************************************/
+static int start_file_thread(void);
+static void remove_file_thread(void);
+
/**
* Creates absolute time to use with pthread_cond_timedwait.
*
@@ -88,6 +100,42 @@ static void get_timeout_time(struct time
timeout_time->tv_nsec = (millisec2 % 1000) * 1000000;
}
+/**
+ * Checks if time to recover the file thread. If timeout do the recovery
+ * Global variables:
+ * ftr_start_time; Time saved when file thread was timed out.
+ * ftr_start_flag; Set to true when recovery timeout shall be
measured.
+ *
+ */
+static void ft_check_recovery(void)
+{
+ struct timespec end_time;
+ uint64_t stime_ms, etime_ms, dtime_ms;
+ int rc;
+
+ TRACE_ENTER2("ftr_started_flag = %d",ftr_started_flag);
+ if (ftr_started_flag == true) {
+ /* Calculate elapsed time */
+ GETTIME(end_time);
+ stime_ms = (ftr_start_time.tv_sec * 1000) +
(ftr_start_time.tv_nsec / 1000000);
+ etime_ms = (end_time.tv_sec * 1000) + (end_time.tv_nsec /
1000000);
+ dtime_ms = etime_ms - stime_ms;
+
+ TRACE("dtime_ms = %ld",dtime_ms);
+
+ if (dtime_ms >= (MAX_RECOVERYTIME_s * 1000)) {
+ TRACE("Recovering file thread");
+ remove_file_thread();
+ rc = start_file_thread();
+ if (rc) {
+ LOG_ER("File thread could not be recovered.
Exiting...");
+ _Exit(EXIT_FAILURE);
+ }
+ }
+ }
+ TRACE_LEAVE();
+}
+
/*****************************************************************************
* Thread handling
*****************************************************************************/
@@ -106,17 +154,18 @@ static void *file_hndl_thread(void *nopa
{
int rc = 0;
int hndl_rc = 0;
- void *inbuf;
- void *outbuf;
- uint32_t max_outsize;
+ int dummy;
//#define LLD_DELAY_TST
#ifdef LLD_DELAY_TST /* Make "file system" hang for n sec after start */
static bool lld_start_f = true;
- const unsigned int lld_sleep_sec = 10;
+ const unsigned int lld_sleep_sec = 60;
#endif
TRACE("%s - is started",__FUNCTION__);
+ /* Configure cancellation so that thread can be canceled at any time */
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &dummy);
+ pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &dummy);
osaf_mutex_lock_ordie(&lgs_ftcom_mutex); /* LOCK */
while(1) {
@@ -126,21 +175,6 @@ static void *file_hndl_thread(void *nopa
if (rc != 0) osaf_abort(rc);
} else {
- /* Handle communication buffer */
- if (lgs_com_data.indata_size != 0) {
- inbuf = malloc(lgs_com_data.indata_size);
- memcpy(inbuf, lgs_com_data.indata,
lgs_com_data.indata_size);
- } else {
- inbuf = NULL;
- }
-
- if (lgs_com_data.outdata_size != 0) {
- outbuf = malloc(lgs_com_data.outdata_size);
- } else {
- outbuf = NULL;
- }
- max_outsize = lgs_com_data.outdata_size;
-
/* Handle the request.
* A handler is handling file operations that may
'hang'. Therefore
* the mutex cannot be locked since that may cause the
main thread
@@ -162,34 +196,44 @@ static void *file_hndl_thread(void *nopa
/* Invoke requested handler function */
switch (lgs_com_data.request_code) {
case LGSF_FILEOPEN:
- hndl_rc = fileopen_hdl(inbuf, outbuf,
max_outsize);
+ hndl_rc = fileopen_hdl(lgs_com_data.indata_ptr,
+ lgs_com_data.outdata_ptr,
lgs_com_data.outdata_size);
break;
case LGSF_FILECLOSE:
- hndl_rc = fileclose_hdl(inbuf, outbuf,
max_outsize);
+ hndl_rc = fileclose_hdl(lgs_com_data.indata_ptr,
+ lgs_com_data.outdata_ptr,
lgs_com_data.outdata_size);
break;
case LGSF_DELETE_FILE:
- hndl_rc = delete_file_hdl(inbuf, outbuf,
max_outsize);
+ hndl_rc =
delete_file_hdl(lgs_com_data.indata_ptr,
+ lgs_com_data.outdata_ptr,
lgs_com_data.outdata_size);
break;
case LGSF_GET_NUM_LOGFILES:
- hndl_rc = get_number_of_log_files_hdl(inbuf,
outbuf, max_outsize);
+ hndl_rc =
get_number_of_log_files_hdl(lgs_com_data.indata_ptr,
+ lgs_com_data.outdata_ptr,
lgs_com_data.outdata_size);
break;
case LGSF_MAKELOGDIR:
- hndl_rc = make_log_dir_hdl(inbuf, outbuf,
max_outsize);
+ hndl_rc =
make_log_dir_hdl(lgs_com_data.indata_ptr,
+ lgs_com_data.outdata_ptr,
lgs_com_data.outdata_size);
break;
case LGSF_WRITELOGREC:
- hndl_rc = write_log_record_hdl(inbuf, outbuf,
max_outsize);
+ hndl_rc =
write_log_record_hdl(lgs_com_data.indata_ptr,
+ lgs_com_data.outdata_ptr,
lgs_com_data.outdata_size);
break;
case LGSF_CREATECFGFILE:
- hndl_rc = create_config_file_hdl(inbuf, outbuf,
max_outsize);
+ hndl_rc =
create_config_file_hdl(lgs_com_data.indata_ptr,
+ lgs_com_data.outdata_ptr,
lgs_com_data.outdata_size);
break;
case LGSF_RENAME_FILE:
- hndl_rc = rename_file_hdl(inbuf, outbuf,
max_outsize);
+ hndl_rc =
rename_file_hdl(lgs_com_data.indata_ptr,
+ lgs_com_data.outdata_ptr,
lgs_com_data.outdata_size);
break;
case LGSF_CHECKPATH:
- hndl_rc = check_path_exists_hdl(inbuf, outbuf,
max_outsize);
+ hndl_rc =
check_path_exists_hdl(lgs_com_data.indata_ptr,
+ lgs_com_data.outdata_ptr,
lgs_com_data.outdata_size);
break;
case LGSF_CHECKDIR:
- hndl_rc = path_is_writeable_dir_hdl(inbuf,
outbuf, max_outsize);
+ hndl_rc =
path_is_writeable_dir_hdl(lgs_com_data.indata_ptr,
+ lgs_com_data.outdata_ptr,
lgs_com_data.outdata_size);
default:
break;
}
@@ -206,24 +250,15 @@ static void *file_hndl_thread(void *nopa
*/
lgs_com_data.request_f = false; /* Prepare to take a
new request */
lgs_com_data.request_code = LGSF_NOREQ;
- free(inbuf);
/* The following cannot be done if the API has timed
out */
if (lgs_com_data.timeout_f == false) {
lgs_com_data.answer_f = true;
lgs_com_data.return_code = hndl_rc;
- if (lgs_com_data.outdata_size != 0) {
- memcpy(lgs_com_data.outdata, outbuf,
lgs_com_data.outdata_size);
- free(outbuf);
- } else {
- lgs_com_data.outdata = NULL;
- }
/* Signal the API function that we are done */
rc = pthread_cond_signal(&answer_cv);
if (rc != 0) osaf_abort(rc);
- } else {
- free(outbuf);
}
}
} /* End while(1) */
@@ -238,7 +273,6 @@ static int start_file_thread(void)
{
int rc = 0;
int tbd_inpar=1;
- pthread_t thread;
TRACE_ENTER();
@@ -261,7 +295,7 @@ static int start_file_thread(void)
/* Create thread.
*/
- rc = pthread_create(&thread, NULL, file_hndl_thread, (void *)
&tbd_inpar);
+ rc = pthread_create(&file_thread_id, NULL, file_hndl_thread, (void *)
&tbd_inpar);
if (rc != 0) {
LOG_ER("pthread_create fail %s",strerror(errno));
goto done;
@@ -273,6 +307,44 @@ done:
}
/**
+ * Remove and cleanup file thread
+ */
+static void remove_file_thread(void)
+{
+ int rc;
+
+ TRACE_ENTER();
+
+ /* Remove the thread */
+ rc = pthread_cancel(file_thread_id);
+ if (rc) {
+ TRACE("pthread_cancel fail - %s",strerror(rc));
+ }
+
+ /* Cleanup mutex and conditions */
+ rc = pthread_cond_destroy(&request_cv);
+ if (rc) {
+ TRACE("pthread_cond_destroy, request_cv fail -
%s",strerror(rc));
+ }
+ rc = pthread_cond_destroy(&answer_cv);
+ if (rc) {
+ TRACE("pthread_cond_destroy, answer_cv fail - %s",strerror(rc));
+ }
+ rc = pthread_mutex_destroy(&lgs_ftcom_mutex);
+ if (rc) {
+ TRACE("pthread_cond_destroy, answer_cv fail - %s",strerror(rc));
+ }
+
+ /* Clean up thread synchronization */
+ lgs_com_data.answer_f = false;
+ lgs_com_data.request_f = false;
+ lgs_com_data.request_code = LGSF_NOREQ;
+ lgs_com_data.return_code = LGSF_NORETC;
+
+ TRACE_LEAVE();
+}
+
+/**
* Initialize threaded file handling
*/
uint32_t lgs_file_init(void)
@@ -300,7 +372,8 @@ lgsf_retcode_t log_file_api(lgsf_apipar_
{
lgsf_retcode_t api_rc = LGSF_SUCESS;
int rc = 0;
- struct timespec timeout_time, start_time, end_time;
+ struct timespec timeout_time;
+ struct timespec m_start_time, m_end_time;
uint64_t stime_ms, etime_ms, dtime_ms;
TRACE_ENTER();
@@ -319,17 +392,16 @@ lgsf_retcode_t log_file_api(lgsf_apipar_
/* Enter data for a request */
lgs_com_data.request_code = apipar_in->req_code_in;
if (apipar_in->data_in_size != 0) {
- lgs_com_data.indata = malloc(apipar_in->data_in_size);
- memcpy(lgs_com_data.indata, apipar_in->data_in,
apipar_in->data_in_size);
+ lgs_com_data.indata_ptr = malloc(apipar_in->data_in_size);
+ memcpy(lgs_com_data.indata_ptr, apipar_in->data_in,
apipar_in->data_in_size);
} else {
- lgs_com_data.indata = NULL;
+ lgs_com_data.indata_ptr = NULL;
}
- lgs_com_data.indata_size = apipar_in->data_in_size;
if (apipar_in->data_out_size != 0) {
- lgs_com_data.outdata = malloc(apipar_in->data_out_size);
+ lgs_com_data.outdata_ptr = malloc(apipar_in->data_out_size);
} else {
- lgs_com_data.outdata = NULL;
+ lgs_com_data.outdata_ptr = NULL;
}
lgs_com_data.outdata_size = apipar_in->data_out_size;
@@ -341,7 +413,7 @@ lgsf_retcode_t log_file_api(lgsf_apipar_
if (rc != 0) osaf_abort(rc);
/* Wait for an answer */
- GETTIME(start_time); /* Used for TRACE of print of time to answer */
+ GETTIME(m_start_time); /* Used for TRACE of print of time to answer */
get_timeout_time(&timeout_time, MAX_WAITTIME_ms);
@@ -352,6 +424,9 @@ lgsf_retcode_t log_file_api(lgsf_apipar_
TRACE("Timed out before answer");
api_rc = LGSF_TIMEOUT;
lgs_com_data.timeout_f = true; /* Inform thread about
timeout */
+ /* Set start time for thread recovery timeout */
+ GETTIME(ftr_start_time);
+ ftr_started_flag = true; /* Switch on timeout check */
goto done;
} else if (rc != 0) {
TRACE("pthread wait Failed - %s",strerror(rc));
@@ -366,12 +441,12 @@ lgsf_retcode_t log_file_api(lgsf_apipar_
* the returned data.
*/
apipar_in->hdl_ret_code_out = lgs_com_data.return_code;
- memcpy(apipar_in->data_out, lgs_com_data.outdata,
lgs_com_data.outdata_size);
+ memcpy(apipar_in->data_out, lgs_com_data.outdata_ptr,
lgs_com_data.outdata_size);
/* Measure answer time for TRACE */
- GETTIME(end_time);
- stime_ms = (start_time.tv_sec * 1000) + (start_time.tv_nsec / 1000000);
- etime_ms = (end_time.tv_sec * 1000) + (end_time.tv_nsec / 1000000);
+ GETTIME(m_end_time);
+ stime_ms = (m_start_time.tv_sec * 1000) + (m_start_time.tv_nsec /
1000000);
+ etime_ms = (m_end_time.tv_sec * 1000) + (m_end_time.tv_nsec / 1000000);
dtime_ms = etime_ms - stime_ms;
TRACE("Time waited for answer %ld ms",dtime_ms);
@@ -379,13 +454,23 @@ lgsf_retcode_t log_file_api(lgsf_apipar_
lgs_com_data.answer_f = false;
lgs_com_data.return_code = LGSF_NORETC;
+ /* We are not hanging. Switch off recovery timer if armed */
+ ftr_started_flag = false;
+
done:
/* Prepare for new request/answer cycle */
- if (lgs_com_data.indata != NULL) free(lgs_com_data.indata);
- if (lgs_com_data.outdata != NULL) free(lgs_com_data.outdata);
+ if (lgs_com_data.indata_ptr != NULL) free(lgs_com_data.indata_ptr);
+ if (lgs_com_data.outdata_ptr != NULL) free(lgs_com_data.outdata_ptr);
api_exit:
osaf_mutex_unlock_ordie(&lgs_ftcom_mutex); /* UNLOCK */
+ /* If thread is hanging, check for how long time it has been hanging
+ * by reading time and compare with start time for hanging.
+ * If too long reset thread. Note: This must be done here after the
mutex
+ * is unlocked.
+ */
+ ft_check_recovery();
+
TRACE_LEAVE();
return api_rc;
}
diff --git a/osaf/services/saf/logsv/lgs/lgs_stream.c
b/osaf/services/saf/logsv/lgs/lgs_stream.c
--- a/osaf/services/saf/logsv/lgs/lgs_stream.c
+++ b/osaf/services/saf/logsv/lgs/lgs_stream.c
@@ -893,7 +893,9 @@ done:
* @param buf
* @param count
*
- * @return int -1 on error, 0 otherwise
+ * @return int 0 No error
+ * -1 on error
+ * -2 Write failed because of write timeout
*/
int log_stream_write_h(log_stream_t *stream, const char *buf, size_t count)
{
@@ -957,7 +959,10 @@ int log_stream_write_h(log_stream_t *str
apipar.data_out = (void *) &write_errno;
api_rc = log_file_api(&apipar);
- if (api_rc != LGSF_SUCESS) {
+ if (api_rc == LGSF_TIMEOUT) {
+ TRACE("%s - API error
%s",__FUNCTION__,lgsf_retcode_str(api_rc));
+ rc = -2;
+ } else if (api_rc != LGSF_SUCESS) {
TRACE("%s - API error
%s",__FUNCTION__,lgsf_retcode_str(api_rc));
rc = -1;
} else {
@@ -975,8 +980,13 @@ int log_stream_write_h(log_stream_t *str
LOG_IN("write '%s' failed \"%s\"", stream->logFileCurrent,
strerror(write_errno));
- if (write_errno == EBADF) {
+ /* If writing failed because of invalid file descriptor or
+ * staled NFS handle then invalidate the stream file descriptor.
+ * This will result in reopening of stream files.
+ */
+ if ((write_errno == EBADF) || (write_errno == ESTALE)) {
stream->fd = -1;
+ TRACE("Recovery because of \"%s\"
initiated",strerror(write_errno));
}
goto done;
}
------------------------------------------------------------------------------
Get 100% visibility into Java/.NET code with AppDynamics Lite!
It's a free troubleshooting tool designed for production.
Get down to code-level detail for bottlenecks, with <2% overhead.
Download for free and get started troubleshooting in minutes.
http://pubads.g.doubleclick.net/gampad/clk?id=48897031&iu=/4140/ostg.clktrk
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel