osaf/services/saf/logsv/lgs/lgs_evt.c | 8 +- osaf/services/saf/logsv/lgs/lgs_file.c | 199 ++++++++++++++++++++++-------- osaf/services/saf/logsv/lgs/lgs_stream.c | 16 ++- 3 files changed, 161 insertions(+), 62 deletions(-)
- Remove unnecessary data copying in log_file_api() and file_hndl_thread() - Return SA_AIS_ERR_TIMEOUT if the write operation time out when a log record shall be written. If the file thread is already "hanging" when a write is requested no attempt to write is made and SA_AIS_ERR_TRY_AGAIN is returned as before. - Try to recover file thread by recreating it if it hangs for a long time. - Recover if bad file descriptor or stale NFS handle. diff --git a/osaf/services/saf/logsv/lgs/lgs_evt.c b/osaf/services/saf/logsv/lgs/lgs_evt.c --- a/osaf/services/saf/logsv/lgs/lgs_evt.c +++ b/osaf/services/saf/logsv/lgs/lgs_evt.c @@ -1023,7 +1023,7 @@ static uint32_t proc_write_log_async_msg SaAisErrorT error = SA_AIS_OK; SaStringT logOutputString = NULL; SaUint32T buf_size; - int n; + int n, rc; TRACE_ENTER2("client_id %u, stream ID %u", param->client_id, param->lstr_id); @@ -1064,9 +1064,13 @@ static uint32_t proc_write_log_async_msg goto done; } - if (log_stream_write_h(stream, logOutputString, n) == -1) { + rc = log_stream_write_h(stream, logOutputString, n); + if (rc == -1) { error = SA_AIS_ERR_TRY_AGAIN; goto done; + } else if (rc == -2) { + error = SA_AIS_ERR_TIMEOUT; + goto done; } /* diff --git a/osaf/services/saf/logsv/lgs/lgs_file.c b/osaf/services/saf/logsv/lgs/lgs_file.c --- a/osaf/services/saf/logsv/lgs/lgs_file.c +++ b/osaf/services/saf/logsv/lgs/lgs_file.c @@ -34,7 +34,10 @@ #include "osaf_utility.h" +/* Max time to wait for file thread to finish */ #define MAX_WAITTIME_ms 500 /* ms */ +/* Max time to wait for hanging file thread before recovery */ +#define MAX_RECOVERYTIME_s 600 /* s */ #define GETTIME(x) osafassert(clock_gettime(CLOCK_REALTIME, &x) == 0); static pthread_mutex_t lgs_ftcom_mutex; /* For locking communication */ @@ -47,12 +50,14 @@ struct file_communicate { bool timeout_f; /* True if API has got a timeout. Thread shall not answer */ lgsf_treq_t request_code; /* Request code from API */ int return_code; /* Return code from handlers */ - size_t indata_size; - void *indata; /* In-parameters for handlers */ + void *indata_ptr; /* In-parameters for handlers */ size_t outdata_size; - void *outdata; /* Out data from handlers */ + void *outdata_ptr; /* Out data from handlers */ }; +/* Used for synchronizing and transfer of data ownership between main thread + * and file thread. + */ static struct file_communicate lgs_com_data = { .answer_f = false, .request_f = false, @@ -60,10 +65,17 @@ static struct file_communicate lgs_com_d .return_code = LGSF_NORETC }; +static pthread_t file_thread_id; +static struct timespec ftr_start_time; /* Start time used for file thread recovery */ +static bool ftr_started_flag = false; /* Set to true if thread is hanging */ + /***************************************************************************** * Utility functions *****************************************************************************/ +static int start_file_thread(void); +static void remove_file_thread(void); + /** * Creates absolute time to use with pthread_cond_timedwait. * @@ -88,6 +100,42 @@ static void get_timeout_time(struct time timeout_time->tv_nsec = (millisec2 % 1000) * 1000000; } +/** + * Checks if time to recover the file thread. If timeout do the recovery + * Global variables: + * ftr_start_time; Time saved when file thread was timed out. + * ftr_start_flag; Set to true when recovery timeout shall be measured. + * + */ +static void ft_check_recovery(void) +{ + struct timespec end_time; + uint64_t stime_ms, etime_ms, dtime_ms; + int rc; + + TRACE_ENTER2("ftr_started_flag = %d",ftr_started_flag); + if (ftr_started_flag == true) { + /* Calculate elapsed time */ + GETTIME(end_time); + stime_ms = (ftr_start_time.tv_sec * 1000) + (ftr_start_time.tv_nsec / 1000000); + etime_ms = (end_time.tv_sec * 1000) + (end_time.tv_nsec / 1000000); + dtime_ms = etime_ms - stime_ms; + + TRACE("dtime_ms = %ld",dtime_ms); + + if (dtime_ms >= (MAX_RECOVERYTIME_s * 1000)) { + TRACE("Recovering file thread"); + remove_file_thread(); + rc = start_file_thread(); + if (rc) { + LOG_ER("File thread could not be recovered. Exiting..."); + _Exit(EXIT_FAILURE); + } + } + } + TRACE_LEAVE(); +} + /***************************************************************************** * Thread handling *****************************************************************************/ @@ -106,17 +154,18 @@ static void *file_hndl_thread(void *nopa { int rc = 0; int hndl_rc = 0; - void *inbuf; - void *outbuf; - uint32_t max_outsize; + int dummy; //#define LLD_DELAY_TST #ifdef LLD_DELAY_TST /* Make "file system" hang for n sec after start */ static bool lld_start_f = true; - const unsigned int lld_sleep_sec = 10; + const unsigned int lld_sleep_sec = 60; #endif TRACE("%s - is started",__FUNCTION__); + /* Configure cancellation so that thread can be canceled at any time */ + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &dummy); + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &dummy); osaf_mutex_lock_ordie(&lgs_ftcom_mutex); /* LOCK */ while(1) { @@ -126,21 +175,6 @@ static void *file_hndl_thread(void *nopa if (rc != 0) osaf_abort(rc); } else { - /* Handle communication buffer */ - if (lgs_com_data.indata_size != 0) { - inbuf = malloc(lgs_com_data.indata_size); - memcpy(inbuf, lgs_com_data.indata, lgs_com_data.indata_size); - } else { - inbuf = NULL; - } - - if (lgs_com_data.outdata_size != 0) { - outbuf = malloc(lgs_com_data.outdata_size); - } else { - outbuf = NULL; - } - max_outsize = lgs_com_data.outdata_size; - /* Handle the request. * A handler is handling file operations that may 'hang'. Therefore * the mutex cannot be locked since that may cause the main thread @@ -162,34 +196,44 @@ static void *file_hndl_thread(void *nopa /* Invoke requested handler function */ switch (lgs_com_data.request_code) { case LGSF_FILEOPEN: - hndl_rc = fileopen_hdl(inbuf, outbuf, max_outsize); + hndl_rc = fileopen_hdl(lgs_com_data.indata_ptr, + lgs_com_data.outdata_ptr, lgs_com_data.outdata_size); break; case LGSF_FILECLOSE: - hndl_rc = fileclose_hdl(inbuf, outbuf, max_outsize); + hndl_rc = fileclose_hdl(lgs_com_data.indata_ptr, + lgs_com_data.outdata_ptr, lgs_com_data.outdata_size); break; case LGSF_DELETE_FILE: - hndl_rc = delete_file_hdl(inbuf, outbuf, max_outsize); + hndl_rc = delete_file_hdl(lgs_com_data.indata_ptr, + lgs_com_data.outdata_ptr, lgs_com_data.outdata_size); break; case LGSF_GET_NUM_LOGFILES: - hndl_rc = get_number_of_log_files_hdl(inbuf, outbuf, max_outsize); + hndl_rc = get_number_of_log_files_hdl(lgs_com_data.indata_ptr, + lgs_com_data.outdata_ptr, lgs_com_data.outdata_size); break; case LGSF_MAKELOGDIR: - hndl_rc = make_log_dir_hdl(inbuf, outbuf, max_outsize); + hndl_rc = make_log_dir_hdl(lgs_com_data.indata_ptr, + lgs_com_data.outdata_ptr, lgs_com_data.outdata_size); break; case LGSF_WRITELOGREC: - hndl_rc = write_log_record_hdl(inbuf, outbuf, max_outsize); + hndl_rc = write_log_record_hdl(lgs_com_data.indata_ptr, + lgs_com_data.outdata_ptr, lgs_com_data.outdata_size); break; case LGSF_CREATECFGFILE: - hndl_rc = create_config_file_hdl(inbuf, outbuf, max_outsize); + hndl_rc = create_config_file_hdl(lgs_com_data.indata_ptr, + lgs_com_data.outdata_ptr, lgs_com_data.outdata_size); break; case LGSF_RENAME_FILE: - hndl_rc = rename_file_hdl(inbuf, outbuf, max_outsize); + hndl_rc = rename_file_hdl(lgs_com_data.indata_ptr, + lgs_com_data.outdata_ptr, lgs_com_data.outdata_size); break; case LGSF_CHECKPATH: - hndl_rc = check_path_exists_hdl(inbuf, outbuf, max_outsize); + hndl_rc = check_path_exists_hdl(lgs_com_data.indata_ptr, + lgs_com_data.outdata_ptr, lgs_com_data.outdata_size); break; case LGSF_CHECKDIR: - hndl_rc = path_is_writeable_dir_hdl(inbuf, outbuf, max_outsize); + hndl_rc = path_is_writeable_dir_hdl(lgs_com_data.indata_ptr, + lgs_com_data.outdata_ptr, lgs_com_data.outdata_size); default: break; } @@ -206,24 +250,15 @@ static void *file_hndl_thread(void *nopa */ lgs_com_data.request_f = false; /* Prepare to take a new request */ lgs_com_data.request_code = LGSF_NOREQ; - free(inbuf); /* The following cannot be done if the API has timed out */ if (lgs_com_data.timeout_f == false) { lgs_com_data.answer_f = true; lgs_com_data.return_code = hndl_rc; - if (lgs_com_data.outdata_size != 0) { - memcpy(lgs_com_data.outdata, outbuf, lgs_com_data.outdata_size); - free(outbuf); - } else { - lgs_com_data.outdata = NULL; - } /* Signal the API function that we are done */ rc = pthread_cond_signal(&answer_cv); if (rc != 0) osaf_abort(rc); - } else { - free(outbuf); } } } /* End while(1) */ @@ -238,7 +273,6 @@ static int start_file_thread(void) { int rc = 0; int tbd_inpar=1; - pthread_t thread; TRACE_ENTER(); @@ -261,7 +295,7 @@ static int start_file_thread(void) /* Create thread. */ - rc = pthread_create(&thread, NULL, file_hndl_thread, (void *) &tbd_inpar); + rc = pthread_create(&file_thread_id, NULL, file_hndl_thread, (void *) &tbd_inpar); if (rc != 0) { LOG_ER("pthread_create fail %s",strerror(errno)); goto done; @@ -273,6 +307,44 @@ done: } /** + * Remove and cleanup file thread + */ +static void remove_file_thread(void) +{ + int rc; + + TRACE_ENTER(); + + /* Remove the thread */ + rc = pthread_cancel(file_thread_id); + if (rc) { + TRACE("pthread_cancel fail - %s",strerror(rc)); + } + + /* Cleanup mutex and conditions */ + rc = pthread_cond_destroy(&request_cv); + if (rc) { + TRACE("pthread_cond_destroy, request_cv fail - %s",strerror(rc)); + } + rc = pthread_cond_destroy(&answer_cv); + if (rc) { + TRACE("pthread_cond_destroy, answer_cv fail - %s",strerror(rc)); + } + rc = pthread_mutex_destroy(&lgs_ftcom_mutex); + if (rc) { + TRACE("pthread_cond_destroy, answer_cv fail - %s",strerror(rc)); + } + + /* Clean up thread synchronization */ + lgs_com_data.answer_f = false; + lgs_com_data.request_f = false; + lgs_com_data.request_code = LGSF_NOREQ; + lgs_com_data.return_code = LGSF_NORETC; + + TRACE_LEAVE(); +} + +/** * Initialize threaded file handling */ uint32_t lgs_file_init(void) @@ -300,7 +372,8 @@ lgsf_retcode_t log_file_api(lgsf_apipar_ { lgsf_retcode_t api_rc = LGSF_SUCESS; int rc = 0; - struct timespec timeout_time, start_time, end_time; + struct timespec timeout_time; + struct timespec m_start_time, m_end_time; uint64_t stime_ms, etime_ms, dtime_ms; TRACE_ENTER(); @@ -319,17 +392,16 @@ lgsf_retcode_t log_file_api(lgsf_apipar_ /* Enter data for a request */ lgs_com_data.request_code = apipar_in->req_code_in; if (apipar_in->data_in_size != 0) { - lgs_com_data.indata = malloc(apipar_in->data_in_size); - memcpy(lgs_com_data.indata, apipar_in->data_in, apipar_in->data_in_size); + lgs_com_data.indata_ptr = malloc(apipar_in->data_in_size); + memcpy(lgs_com_data.indata_ptr, apipar_in->data_in, apipar_in->data_in_size); } else { - lgs_com_data.indata = NULL; + lgs_com_data.indata_ptr = NULL; } - lgs_com_data.indata_size = apipar_in->data_in_size; if (apipar_in->data_out_size != 0) { - lgs_com_data.outdata = malloc(apipar_in->data_out_size); + lgs_com_data.outdata_ptr = malloc(apipar_in->data_out_size); } else { - lgs_com_data.outdata = NULL; + lgs_com_data.outdata_ptr = NULL; } lgs_com_data.outdata_size = apipar_in->data_out_size; @@ -341,7 +413,7 @@ lgsf_retcode_t log_file_api(lgsf_apipar_ if (rc != 0) osaf_abort(rc); /* Wait for an answer */ - GETTIME(start_time); /* Used for TRACE of print of time to answer */ + GETTIME(m_start_time); /* Used for TRACE of print of time to answer */ get_timeout_time(&timeout_time, MAX_WAITTIME_ms); @@ -352,6 +424,9 @@ lgsf_retcode_t log_file_api(lgsf_apipar_ TRACE("Timed out before answer"); api_rc = LGSF_TIMEOUT; lgs_com_data.timeout_f = true; /* Inform thread about timeout */ + /* Set start time for thread recovery timeout */ + GETTIME(ftr_start_time); + ftr_started_flag = true; /* Switch on timeout check */ goto done; } else if (rc != 0) { TRACE("pthread wait Failed - %s",strerror(rc)); @@ -366,12 +441,12 @@ lgsf_retcode_t log_file_api(lgsf_apipar_ * the returned data. */ apipar_in->hdl_ret_code_out = lgs_com_data.return_code; - memcpy(apipar_in->data_out, lgs_com_data.outdata, lgs_com_data.outdata_size); + memcpy(apipar_in->data_out, lgs_com_data.outdata_ptr, lgs_com_data.outdata_size); /* Measure answer time for TRACE */ - GETTIME(end_time); - stime_ms = (start_time.tv_sec * 1000) + (start_time.tv_nsec / 1000000); - etime_ms = (end_time.tv_sec * 1000) + (end_time.tv_nsec / 1000000); + GETTIME(m_end_time); + stime_ms = (m_start_time.tv_sec * 1000) + (m_start_time.tv_nsec / 1000000); + etime_ms = (m_end_time.tv_sec * 1000) + (m_end_time.tv_nsec / 1000000); dtime_ms = etime_ms - stime_ms; TRACE("Time waited for answer %ld ms",dtime_ms); @@ -379,13 +454,23 @@ lgsf_retcode_t log_file_api(lgsf_apipar_ lgs_com_data.answer_f = false; lgs_com_data.return_code = LGSF_NORETC; + /* We are not hanging. Switch off recovery timer if armed */ + ftr_started_flag = false; + done: /* Prepare for new request/answer cycle */ - if (lgs_com_data.indata != NULL) free(lgs_com_data.indata); - if (lgs_com_data.outdata != NULL) free(lgs_com_data.outdata); + if (lgs_com_data.indata_ptr != NULL) free(lgs_com_data.indata_ptr); + if (lgs_com_data.outdata_ptr != NULL) free(lgs_com_data.outdata_ptr); api_exit: osaf_mutex_unlock_ordie(&lgs_ftcom_mutex); /* UNLOCK */ + /* If thread is hanging, check for how long time it has been hanging + * by reading time and compare with start time for hanging. + * If too long reset thread. Note: This must be done here after the mutex + * is unlocked. + */ + ft_check_recovery(); + TRACE_LEAVE(); return api_rc; } diff --git a/osaf/services/saf/logsv/lgs/lgs_stream.c b/osaf/services/saf/logsv/lgs/lgs_stream.c --- a/osaf/services/saf/logsv/lgs/lgs_stream.c +++ b/osaf/services/saf/logsv/lgs/lgs_stream.c @@ -893,7 +893,9 @@ done: * @param buf * @param count * - * @return int -1 on error, 0 otherwise + * @return int 0 No error + * -1 on error + * -2 Write failed because of write timeout */ int log_stream_write_h(log_stream_t *stream, const char *buf, size_t count) { @@ -957,7 +959,10 @@ int log_stream_write_h(log_stream_t *str apipar.data_out = (void *) &write_errno; api_rc = log_file_api(&apipar); - if (api_rc != LGSF_SUCESS) { + if (api_rc == LGSF_TIMEOUT) { + TRACE("%s - API error %s",__FUNCTION__,lgsf_retcode_str(api_rc)); + rc = -2; + } else if (api_rc != LGSF_SUCESS) { TRACE("%s - API error %s",__FUNCTION__,lgsf_retcode_str(api_rc)); rc = -1; } else { @@ -975,8 +980,13 @@ int log_stream_write_h(log_stream_t *str LOG_IN("write '%s' failed \"%s\"", stream->logFileCurrent, strerror(write_errno)); - if (write_errno == EBADF) { + /* If writing failed because of invalid file descriptor or + * staled NFS handle then invalidate the stream file descriptor. + * This will result in reopening of stream files. + */ + if ((write_errno == EBADF) || (write_errno == ESTALE)) { stream->fd = -1; + TRACE("Recovery because of \"%s\" initiated",strerror(write_errno)); } goto done; } ------------------------------------------------------------------------------ Get 100% visibility into Java/.NET code with AppDynamics Lite! It's a free troubleshooting tool designed for production. Get down to code-level detail for bottlenecks, with <2% overhead. Download for free and get started troubleshooting in minutes. http://pubads.g.doubleclick.net/gampad/clk?id=48897031&iu=/4140/ostg.clktrk _______________________________________________ Opensaf-devel mailing list Opensaf-devel@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/opensaf-devel