To avoid the risk of losing traces, logtrace server should flush traces
as soon as possible when logtrace owner is terminated.
This patch also refactors LogServer::Run().
---
src/dtm/transport/log_server.cc | 123 ++++++++++++++++++++------------
src/dtm/transport/log_server.h | 7 ++
2 files changed, 83 insertions(+), 47 deletions(-)
diff --git a/src/dtm/transport/log_server.cc b/src/dtm/transport/log_server.cc
index 201ed2695..fe27cb691 100644
--- a/src/dtm/transport/log_server.cc
+++ b/src/dtm/transport/log_server.cc
@@ -31,6 +31,15 @@
const Osaflog::ClientAddressConstantPrefix LogServer::address_header_{};
+static bool is_pid_alive(const std::string& pid) {
+ struct stat sts;
+ const std::string proc_pid = "/proc/" + pid;
+ if (stat(proc_pid.c_str(), &sts) == -1 && errno == ENOENT) {
+ return false;
+ }
+ return true;
+}
+
LogServer::LogServer(int term_fd)
: term_fd_{term_fd},
max_backups_{9},
@@ -46,64 +55,84 @@ LogServer::LogServer(int term_fd)
LogServer::~LogServer() {
for (const auto& s : log_streams_) delete s.second;
log_streams_.clear();
+ stream_pid_map_.clear();
}
void LogServer::Run() {
struct pollfd pfd[2] = {{term_fd_, POLLIN, 0}, {log_socket_.fd(), POLLIN,
0}};
do {
- for (int i = 0; i < 256; ++i) {
- char* buffer = current_stream_->current_buffer_position();
- struct sockaddr_un src_addr;
- socklen_t addrlen = sizeof(src_addr);
- ssize_t result = log_socket_.RecvFrom(buffer, LogWriter::kMaxMessageSize,
- &src_addr, &addrlen);
- if (result < 0) break;
- if (result == 0 || buffer[0] != '?') {
- while (result != 0 && buffer[result - 1] == '\n') --result;
- if (static_cast<size_t>(result) != LogWriter::kMaxMessageSize) {
- buffer[result++] = '\n';
- } else {
- buffer[result - 1] = '\n';
- }
- size_t msg_id_size;
- const char* msg_id = Osaflog::GetField(buffer, result, 5,
&msg_id_size);
- if (msg_id == nullptr) continue;
- LogStream* stream = GetStream(msg_id, msg_id_size);
- if (stream == nullptr) continue;
- if (stream != current_stream_) {
- memcpy(stream->current_buffer_position(), buffer, result);
- current_stream_ = stream;
- }
- current_stream_->Write(result);
- } else {
- ExecuteCommand(buffer, result, src_addr, addrlen);
- }
- }
-
+ ProcessRecvData();
CloseIdleStreams();
+ PeriodicFlush();
+ pfd[1].fd = log_socket_.fd();
+ // Use fixed timeout (30ms) in poll so that it can help to detect
+ // log stream owner terminated as soon as possible.
+ osaf_ppoll(pfd, 2, &base::kThirtyMilliseconds, nullptr);
+ } while ((pfd[0].revents & POLLIN) == 0);
+}
- struct timespec current = base::ReadMonotonicClock();
- struct timespec last_flush = current;
- bool empty = true;
- for (const auto& s : log_streams_) {
- LogStream* stream = s.second;
- struct timespec flush = stream->last_flush();
- if ((current - flush) >= base::kFifteenSeconds) {
- stream->Flush();
+void LogServer::ProcessRecvData() {
+ for (int i = 0; i < 256; ++i) {
+ char* buffer = current_stream_->current_buffer_position();
+ struct sockaddr_un src_addr;
+ socklen_t addrlen = sizeof(src_addr);
+ ssize_t result = log_socket_.RecvFrom(buffer, LogWriter::kMaxMessageSize,
+ &src_addr, &addrlen);
+ if (result < 0) break;
+ if (result == 0 || buffer[0] != '?') {
+ while (result != 0 && buffer[result - 1] == '\n') --result;
+ if (static_cast<size_t>(result) != LogWriter::kMaxMessageSize) {
+ buffer[result++] = '\n';
} else {
- if (flush < last_flush) last_flush = flush;
+ buffer[result - 1] = '\n';
}
- if (!stream->empty()) empty = false;
+ size_t msg_id_size;
+ const char* msg_id = Osaflog::GetField(buffer, result, 5, &msg_id_size);
+ if (msg_id == nullptr) continue;
+ LogStream* stream = GetStream(msg_id, msg_id_size);
+ if (stream == nullptr) continue;
+ stream_pid_map_[stream->name()] = ExtractPid(buffer, result);
+ if (stream != current_stream_) {
+ memcpy(stream->current_buffer_position(), buffer, result);
+ current_stream_ = stream;
+ }
+ current_stream_->Write(result);
+ } else {
+ ExecuteCommand(buffer, result, src_addr, addrlen);
+ }
+ }
+}
+
+void LogServer::PeriodicFlush() {
+ struct timespec current = base::ReadMonotonicClock();
+ for (const auto& s : log_streams_) {
+ LogStream* stream = s.second;
+ struct timespec flush = stream->last_flush();
+ bool is_owner_alive = is_stream_owner_alive(stream->name());
+ if (((current - flush) >= base::kFifteenSeconds) || !is_owner_alive) {
+ stream->Flush();
}
- struct timespec timeout = (last_flush + base::kFifteenSeconds) - current;
- struct timespec* poll_timeout = &timeout;
- if (empty && log_streams_.size() > 1) {
- uint64_t max_idle = max_idle_time_.tv_sec;
- poll_timeout = (max_idle) ? &max_idle_time_ : nullptr;
+
+ if (is_owner_alive == false) {
+ log_streams_.erase(s.first);
+ stream_pid_map_.erase(s.first);
}
- pfd[1].fd = log_socket_.fd();
- osaf_ppoll(pfd, 2, poll_timeout, nullptr);
- } while ((pfd[0].revents & POLLIN) == 0);
+ }
+}
+
+bool LogServer::is_stream_owner_alive(const std::string& name) {
+ bool is_alive = true;
+ if (stream_pid_map_.find(name) != stream_pid_map_.end()) {
+ is_alive = is_pid_alive(stream_pid_map_[name]);
+ }
+ return is_alive;
+}
+
+std::string LogServer::ExtractPid(const char* msg, size_t size) {
+ size_t pid_size;
+ const char* pid_token = Osaflog::GetField(msg, size, 4, &pid_size);
+ assert(pid_token != nullptr);
+ return std::string{pid_token, pid_size};
}
void LogServer::CloseIdleStreams() {
diff --git a/src/dtm/transport/log_server.h b/src/dtm/transport/log_server.h
index 884a851e6..49e54784e 100644
--- a/src/dtm/transport/log_server.h
+++ b/src/dtm/transport/log_server.h
@@ -81,6 +81,12 @@ class LogServer {
struct timespec last_write_;
LogWriter log_writer_;
};
+
+ void ProcessRecvData();
+ void PeriodicFlush();
+ std::string ExtractPid(const char* msg, size_t size);
+ bool is_stream_owner_alive(const std::string& name);
+
LogStream* GetStream(const char* msg_id, size_t msg_id_size);
// Validate the log stream name, for security reasons. This method will check
// that the string, when used as a file name, does not traverse the directory
@@ -112,6 +118,7 @@ class LogServer {
base::UnixServerSocket log_socket_;
std::map<std::string, LogStream*> log_streams_;
+ std::map<std::string, std::string> stream_pid_map_{};
LogStream* current_stream_;
size_t no_of_log_streams_;
static const Osaflog::ClientAddressConstantPrefix address_header_;
--
2.17.1
_______________________________________________
Opensaf-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/opensaf-devel