+ auto sock = std::unique_ptr<base::UnixServerSocket>(CreateSocket());
+
+ if (!sock) {
+ fprintf(stderr, "Failed to create UNIX domain socket\n");
+ return false;
+ }
+
+ struct sockaddr_un osaftransportd_addr;
+ socklen_t addrlen = base::UnixSocket::SetAddress(Osaflog::kServerSocketPath,
+ &osaftransportd_addr);
+
+ struct Osaflog::cmd_osaflog maxfilesize_cmd;
+ maxfilesize_cmd.marker[0]='?';
+ maxfilesize_cmd.m_cmd = Osaflog::MAXFILESIZE;
+ maxfilesize_cmd.m_value = maxfilesize;
+
+ ssize_t result = sock->SendTo(&maxfilesize_cmd, sizeof(maxfilesize_cmd),
&osaftransportd_addr, addrlen);
+ if (result < 0) {
+ perror("Failed to send message to osaftransportd");
+ return false;
+ } else if (static_cast<size_t>(result) != (sizeof(struct
Osaflog::cmd_osaflog))) {
+ fprintf(stderr, "Failed to send message to osaftransportd\n");
+ return false;
+ }
+
+ struct timespec end_time = base::ReadMonotonicClock() + base::kTenSeconds;
+ for (;;) {
+ struct pollfd fds {
+ sock->fd(), POLLIN, 0
+ };
+ struct timespec current_time = base::ReadMonotonicClock();
+ result = 0;
+ if (current_time >= end_time) {
+ fprintf(stderr, "Timeout\n");
+ return false;
+ }
+ struct timespec timeout = end_time - current_time;
+ result = ppoll(&fds, 1, &timeout, NULL);
+ if (result < 0) {
+ perror("Failed to wait for reply from osaftransportd");
+ return false;
+ } else if (result == 0) {
+ fprintf(stderr, "Timeout\n");
+ return false;
+ }
+ struct sockaddr_un sender_addr;
+ socklen_t sender_addrlen = sizeof(sender_addr);
+ result = sock->RecvFrom(buf, sizeof(buf), &sender_addr, &sender_addrlen);
+ if (result < 0) break;
+ if (sender_addrlen == addrlen &&
+ memcmp(&osaftransportd_addr, &sender_addr, addrlen) == 0)
+ break;
+ }
+ if (result < 0) {
+ perror("Failed to receive reply from osaftransportd");
+ return false;
+ } else if (static_cast<size_t>(result) != (sizeof(struct
Osaflog::cmd_osaflog_resp) )) {
+ struct Osaflog::cmd_osaflog_resp cmdreply_buf;
+ memcpy(&cmdreply_buf,buf,result);
+ if(cmdreply_buf.m_cmdreply != Osaflog::RMAXFILESIZE) {
+ fprintf(stderr, "Received unexpected reply from osaftransportd\n");
+ return false;
+ }
+ }
+ return true;
+}
+
+bool NoOfBackupFiles(size_t nooffiles) {
auto sock = std::unique_ptr<base::UnixServerSocket>(CreateSocket());
if (!sock) {
@@ -97,17 +230,21 @@ bool Flush() {
struct sockaddr_un osaftransportd_addr;
socklen_t addrlen = base::UnixSocket::SetAddress(Osaflog::kServerSocketPath,
&osaftransportd_addr);
- const char flush_command[] = "?flush";
- ssize_t result = sock->SendTo(flush_command, sizeof(flush_command) - 1,
- &osaftransportd_addr, addrlen);
+
+ struct Osaflog::cmd_osaflog maxbackups_cmd;
+ maxbackups_cmd.marker[0]='?';
+ maxbackups_cmd.m_cmd = Osaflog::MAXBACKUPS;
+ maxbackups_cmd.m_value = nooffiles;
+
+ ssize_t result = sock->SendTo(&maxbackups_cmd, sizeof(maxbackups_cmd),
&osaftransportd_addr, addrlen);
if (result < 0) {
perror("Failed to send message to osaftransportd");
return false;
- } else if (static_cast<size_t>(result) != (sizeof(flush_command) - 1)) {
+ }else if (static_cast<size_t>(result) != (sizeof(struct
Osaflog::cmd_osaflog))) {
fprintf(stderr, "Failed to send message to osaftransportd\n");
return false;
}
- static const char expected_reply[] = "!flush";
+
struct timespec end_time = base::ReadMonotonicClock() + base::kTenSeconds;
for (;;) {
struct pollfd fds {
@@ -139,11 +276,88 @@ bool Flush() {
if (result < 0) {
perror("Failed to receive reply from osaftransportd");
return false;
- } else if (static_cast<size_t>(result) != (sizeof(expected_reply) - 1) ||
- memcmp(buf, expected_reply, result) != 0) {
- fprintf(stderr, "Received unexpected reply from osaftransportd\n");
+ } else if (static_cast<size_t>(result) != (sizeof(struct
Osaflog::cmd_osaflog_resp) )) {
+ struct Osaflog::cmd_osaflog_resp cmdreply_buf;
+ memcpy(&cmdreply_buf,buf,result);
+ if(cmdreply_buf.m_cmdreply != Osaflog::RMAXBACKUPS) {
+
+ fprintf(stderr, "Received unexpected reply from osaftransportd for
backupfiles\n");
+ return false;
+ }
+ }
+ return true;
+}
+
+bool Flush(bool flush_done) {
+
+ if (flush_done) {
+ return true;
+ }
+ auto sock = std::unique_ptr<base::UnixServerSocket>(CreateSocket());
+
+ if (!sock) {
+ fprintf(stderr, "Failed to create UNIX domain socket\n");
+ return false;
+ }
+
+ struct sockaddr_un osaftransportd_addr;
+ socklen_t addrlen = base::UnixSocket::SetAddress(Osaflog::kServerSocketPath,
+ &osaftransportd_addr);
+
+ struct Osaflog::cmd_osaflog flush_cmd;
+ flush_cmd.marker[0]='?';
+ flush_cmd.m_cmd = Osaflog::FLUSH;
+ flush_cmd.m_value = 0;
+
+ ssize_t result = sock->SendTo(&flush_cmd, sizeof(flush_cmd),
&osaftransportd_addr, addrlen);
+ if (result < 0) {
+ perror("Failed to send message to osaftransportd");
+ return false;
+ } else if (static_cast<size_t>(result) != (sizeof(struct
Osaflog::cmd_osaflog))) {
+ fprintf(stderr, "Failed to send message to osaftransportd\n");
return false;
}
+
+ struct timespec end_time = base::ReadMonotonicClock() + base::kTenSeconds;
+ for (;;) {
+ struct pollfd fds {
+ sock->fd(), POLLIN, 0
+ };
+ struct timespec current_time = base::ReadMonotonicClock();
+ result = 0;
+ if (current_time >= end_time) {
+ fprintf(stderr, "Timeout\n");
+ return false;
+ }
+ struct timespec timeout = end_time - current_time;
+ result = ppoll(&fds, 1, &timeout, NULL);
+ if (result < 0) {
+ perror("Failed to wait for reply from osaftransportd");
+ return false;
+ } else if (result == 0) {
+ fprintf(stderr, "Timeout\n");
+ return false;
+ }
+ struct sockaddr_un sender_addr;
+ socklen_t sender_addrlen = sizeof(sender_addr);
+ result = sock->RecvFrom(buf, sizeof(buf), &sender_addr, &sender_addrlen);
+ if (result < 0) break;
+ if (sender_addrlen == addrlen &&
+ memcmp(&osaftransportd_addr, &sender_addr, addrlen) == 0)
+ break;
+ }
+ if (result < 0) {
+ perror("Failed to receive reply from osaftransportd");
+ return false;
+ } else if (static_cast<size_t>(result) != (sizeof(struct
Osaflog::cmd_osaflog_resp) )) {
+ struct Osaflog::cmd_osaflog_resp cmdreply_buf;
+ memcpy(&cmdreply_buf,buf,result);
+ if(cmdreply_buf.m_cmdreply != Osaflog::RFLUSH) {
+
+ fprintf(stderr, "Received unexpected reply from osaftransportd for
flush\n");
+ return false;
+ }
+ }
return true;
}
diff --git a/src/dtm/transport/log_server.cc b/src/dtm/transport/log_server.cc
index 780feb1..bee2102 100644
--- a/src/dtm/transport/log_server.cc
+++ b/src/dtm/transport/log_server.cc
@@ -25,18 +25,16 @@
#include "dtm/common/osaflog_protocol.h"
#include "osaf/configmake.h"
-#define TRANSPORTD_CONFIG_FILE PKGSYSCONFDIR "/transportd.conf"
-
-size_t LogServer::no_of_backups = 9;
-size_t LogServer::kmax_file_size = 5000 * 1024;
const Osaflog::ClientAddressConstantPrefix LogServer::address_header_{};
LogServer::LogServer(int term_fd)
: term_fd_{term_fd},
+ no_of_backups_{9},
+ max_file_size_{5000 * 1024},
log_socket_{Osaflog::kServerSocketPath, base::UnixSocket::kNonblocking},
log_streams_{},
- current_stream_{new LogStream{"mds.log", 1, LogServer::kmax_file_size}},
+ current_stream_{new LogStream{"mds.log", 1, 5000 * 1024}},
no_of_log_streams_{1} {
log_streams_["mds.log"] = current_stream_;
}
@@ -48,11 +46,6 @@ LogServer::~LogServer() {
void LogServer::Run() {
struct pollfd pfd[2] = {{term_fd_, POLLIN, 0}, {log_socket_.fd(), POLLIN,
0}};
- /* Initialize a signal handler for loading new configuration from transportd.conf */
- if ((signal(SIGUSR2, usr2_sig_handler)) == SIG_ERR) {
- syslog(LOG_ERR,"signal USR2 registration failed: %s", strerror(errno));
- }
-
do {
for (int i = 0; i < 256; ++i) {
char* buffer = current_stream_->current_buffer_position();
@@ -101,12 +94,6 @@ void LogServer::Run() {
} while ((pfd[0].revents & POLLIN) == 0);
}
-void LogServer::usr2_sig_handler(int sig) {
- syslog(LOG_ERR, "Recived the SIGUSR2 Signal");
- ReadConfig(TRANSPORTD_CONFIG_FILE);
- signal(SIGUSR2, usr2_sig_handler);
-}
-
LogServer::LogStream* LogServer::GetStream(const char* msg_id,
size_t msg_id_size) {
if (msg_id_size == current_stream_->log_name_size() &&
@@ -119,7 +106,7 @@ LogServer::LogStream* LogServer::GetStream(const char*
msg_id,
if (no_of_log_streams_ >= kMaxNoOfStreams) return nullptr;
if (!ValidateLogName(msg_id, msg_id_size)) return nullptr;
- LogStream* stream = new LogStream{log_name, LogServer::no_of_backups, LogServer::kmax_file_size};
+ LogStream* stream = new LogStream{log_name, no_of_backups_, max_file_size_};
auto result = log_streams_.insert(
std::map<std::string, LogStream*>::value_type{log_name, stream});
if (!result.second) osaf_abort(msg_id_size);
@@ -171,7 +158,7 @@ bool LogServer::ReadConfig(const char
*transport_config_file) {
maxFileSize = atoi(&line[tag_len]);
if (maxFileSize > 1) {
- LogServer::kmax_file_size = maxFileSize * 1024 * 1024;
+ max_file_size_ = maxFileSize * 1024 * 1024;
}
}
@@ -181,7 +168,7 @@ bool LogServer::ReadConfig(const char *transport_config_file) {
noOfBackupFiles = atoi(&line[tag_len]);
if (noOfBackupFiles > 1) {
- LogServer::no_of_backups = noOfBackupFiles;
+ no_of_backups_ = noOfBackupFiles;
}
}
}
@@ -210,8 +197,10 @@ void LogServer::ExecuteCommand(const char* command, size_t
size,
const struct sockaddr_un& addr,
socklen_t addrlen) {
if (ValidateAddress(addr, addrlen)) {
- std::string cmd_result = ExecuteCommand(std::string{command, size});
- log_socket_.SendTo(cmd_result.data(), cmd_result.size(), &addr, addrlen);
+
+ struct Osaflog::cmd_osaflog_resp cmdreply_buf;
+ cmdreply_buf.m_cmdreply = ExecuteCommand(command,size);
+ log_socket_.SendTo(&cmdreply_buf, sizeof(cmdreply_buf), &addr, addrlen);
}
}
@@ -224,21 +213,28 @@ bool LogServer::ValidateAddress(const struct sockaddr_un& addr,
}
}
-std::string LogServer::ExecuteCommand(const std::string& command) {
- if (command == "?flush") {
- for (const auto& s : log_streams_) {
- LogStream* stream = s.second;
- stream->Flush();
- }
- return std::string{"!flush"};
- } else {
- return std::string{"!not_supported"};
+enum Osaflog::cmdreply LogServer::ExecuteCommand(const char * command,size_t
size) {
+
+ struct Osaflog::cmd_osaflog cmd_buf;
+ memcpy(&cmd_buf,command,size);
+
+ if(cmd_buf.m_cmd == Osaflog::MAXFILESIZE) {
+ max_file_size_ = cmd_buf.m_value * 1024 * 1024;
+ return Osaflog::RMAXFILESIZE;
+ }else
+ if(cmd_buf.m_cmd == Osaflog::MAXBACKUPS) {
+ no_of_backups_ = cmd_buf.m_value;
+ return Osaflog::RMAXBACKUPS;
+ }else
+ if(cmd_buf.m_cmd == Osaflog::FLUSH) {
+ return Osaflog::RFLUSH;
}
+ return Osaflog::FAILURE;
}
LogServer::LogStream::LogStream(const std::string& log_name,
- size_t no_of_backups, size_t kmax_file_size)
- : log_name_{log_name}, last_flush_{}, log_writer_{log_name, no_of_backups,
kmax_file_size} {
+ size_t no_of_backups_, size_t max_file_size_)
+ : log_name_{log_name}, last_flush_{}, log_writer_{log_name,
no_of_backups_, max_file_size_} {
if (log_name.size() > kMaxLogNameSize) osaf_abort(log_name.size());
}
diff --git a/src/dtm/transport/log_server.h b/src/dtm/transport/log_server.h
index 95ea980..287097c 100644
--- a/src/dtm/transport/log_server.h
+++ b/src/dtm/transport/log_server.h
@@ -33,6 +33,7 @@
class LogServer {
public:
static constexpr size_t kMaxNoOfStreams = 32;
+ static constexpr const char* kTransportdConfigFile = PKGSYSCONFDIR
"/transportd.conf";
// @a term_fd is a file descriptor that will become readable when the
program
// should exit because it has received the SIGTERM signal.
explicit LogServer(int term_fd);
@@ -43,7 +44,7 @@ class LogServer {
// by making the term_fd (provided in the constructor) readable.
void Run();
// To read Transportd.conf
- static bool ReadConfig(const char *transport_config_file);
+ bool ReadConfig(const char *transport_config_file);
private:
class LogStream {
@@ -85,11 +86,12 @@ class LogServer {
static void usr2_sig_handler(int sig);
static bool ValidateAddress(const struct sockaddr_un& addr,
socklen_t addrlen);
- std::string ExecuteCommand(const std::string& command);
+// std::string ExecuteCommand(const std::string& command);
+ enum Osaflog::cmdreply ExecuteCommand(const char* command, size_t size);
int term_fd_;
// Configuration for LogServer
- static size_t no_of_backups;
- static size_t kmax_file_size;
+ size_t no_of_backups_;
+ size_t max_file_size_;
base::UnixServerSocket log_socket_;
std::map<std::string, LogStream*> log_streams_;
diff --git a/src/dtm/transport/main.cc b/src/dtm/transport/main.cc
index 0d1fedf..7d9395a 100644
--- a/src/dtm/transport/main.cc
+++ b/src/dtm/transport/main.cc
@@ -27,8 +27,7 @@
#include "dtm/transport/transport_monitor.h"
-#define TRANSPORTD_CONFIG_FILE PKGSYSCONFDIR "/transportd.conf"
-
+static constexpr const char* kTransportdConfigFile = PKGSYSCONFDIR
"/transportd.conf";
constexpr static const int kDaemonStartWaitTimeInSeconds = 15;
enum Termination { kExit, kDaemonExit, kReboot };
@@ -42,6 +41,7 @@ struct Result {
static void* LogServerStartFunction(void* instance) {
LogServer* log_server = static_cast<LogServer*>(instance);
+ log_server->ReadConfig(kTransportdConfigFile);
log_server->Run();
return nullptr;
}
@@ -85,7 +85,6 @@ Result MainFunction(int term_fd) {
pthread_attr_destroy(&attr);
return Result{kExit, "pthread_attr_setinheritsched() failed", result};
}
- LogServer::ReadConfig(TRANSPORTD_CONFIG_FILE);
LogServer log_server{term_fd};
pthread_t thread_id;
result =
diff --git a/src/dtm/transport/transportd.conf
b/src/dtm/transport/transportd.conf
index 48b334f..f0c15b3 100644
--- a/src/dtm/transport/transportd.conf
+++ b/src/dtm/transport/transportd.conf
@@ -1,13 +1,13 @@
# This file contains configuration for the Transportd service
#
-# TRANSPORT_MAX_LOG_FILESIZE: The maximum size of the log file. The size
value should
-# be in MB's i.e if you give 6 then it is treated as 6 MB. By default value
will be
+# TRANSPORT_MAX_LOG_FILESIZE: The maximum size of the log file. The size
value should
+# be in MB's i.e if you give 6 then it is treated as 6 MB. By default value
will be
# 5 MB
-TRANSPORT_MAX_LOG_FILESIZE=5
+#TRANSPORT_MAX_LOG_FILESIZE=5