ack, code review only, minor comment below/Thanks HansN


On 10/10/2017 06:44 PM, Anders Widell wrote:
Extend the OpenSAF internal log service so that it supports multiple log streams
in addition to the MDS log. All log streams are received on the same socket and
the stream name is extracted from the MSGID field. Each stream is written to a
separate file based on the name of the stream.
---
  src/dtm/transport/log_server.cc            | 107 ++++++++++++++++++++++++++---
  src/dtm/transport/log_server.h             |  36 +++++++++-
  src/dtm/transport/log_writer.cc            |  16 ++---
  src/dtm/transport/log_writer.h             |  10 +--
  src/dtm/transport/osaf-transport.in        |   2 +-
  src/dtm/transport/tests/log_writer_test.cc |   2 +-
  src/dtm/transport/transport_monitor.h      |   7 --
  src/log/apitest/tet_cfg_destination.c      |   8 +--
  src/mds/mds_log.cc                         |   2 +-
  9 files changed, 151 insertions(+), 39 deletions(-)

diff --git a/src/dtm/transport/log_server.cc b/src/dtm/transport/log_server.cc
index c02bfd38a..03bee2353 100644
--- a/src/dtm/transport/log_server.cc
+++ b/src/dtm/transport/log_server.cc
@@ -17,6 +17,7 @@
   */
#include "dtm/transport/log_server.h"
+#include <cstring>
  #include "base/getenv.h"
  #include "base/osaf_poll.h"
  #include "base/time.h"
@@ -26,18 +27,22 @@ LogServer::LogServer(int term_fd)
      : term_fd_{term_fd},
        log_socket_{
            base::GetEnv<std::string>("pkglocalstatedir", PKGLOCALSTATEDIR) +
-          "/mds_log.sock"},
-      log_writer_{} {}
+          "/osaf_log.sock"},
+      log_streams_{},
+      current_stream_{new LogStream{"mds.log"}},
+      no_of_log_streams_{1} {
+  log_streams_["mds.log"] = current_stream_;
+}
-LogServer::~LogServer() {}
+LogServer::~LogServer() {
+  for (const auto& s : log_streams_) delete s.second;
+}
void LogServer::Run() {
    struct pollfd pfd[2] = {{term_fd_, POLLIN, 0}, {log_socket_.fd(), POLLIN, 
0}};
-  struct timespec last_flush {};
    do {
-    if (log_writer_.empty()) last_flush = base::ReadMonotonicClock();
      for (int i = 0; i < 256; ++i) {
-      char* buffer = log_writer_.current_buffer_position();
+      char* buffer = current_stream_->current_buffer_position();
        ssize_t result = log_socket_.Recv(buffer, LogWriter::kMaxMessageSize);
        if (result < 0) break;
        while (result != 0 && buffer[result - 1] == '\n') --result;
@@ -46,15 +51,95 @@ void LogServer::Run() {
        } else {
          buffer[result - 1] = '\n';
        }
-      log_writer_.Write(result);
+      size_t msg_id_size;
+      const char* msg_id = GetField(buffer, result, 5, &msg_id_size);
+      if (msg_id == nullptr) continue;
+      LogStream* stream = GetStream(msg_id, msg_id_size);
+      if (stream == nullptr) continue;
+      if (stream != current_stream_) {
+        memcpy(stream->current_buffer_position(), buffer, result);
+        current_stream_ = stream;
+      }
+      current_stream_->Write(result);
      }
      struct timespec current = base::ReadMonotonicClock();
-    if ((current - last_flush) >= base::kFifteenSeconds) {
-      log_writer_.Flush();
-      last_flush = current;
+    struct timespec last_flush = current;
+    bool empty = true;
+    for (const auto& s : log_streams_) {
+      LogStream* stream = s.second;
+      struct timespec flush = stream->last_flush();
+      if ((current - flush) >= base::kFifteenSeconds) {
+        stream->Flush();
+      } else {
+        if (flush < last_flush) last_flush = flush;
+      }
+      if (!stream->empty()) empty = false;
      }
      struct timespec timeout = (last_flush + base::kFifteenSeconds) - current;
      pfd[1].fd = log_socket_.fd();
-    osaf_ppoll(pfd, 2, log_writer_.empty() ? nullptr : &timeout, nullptr);
+    osaf_ppoll(pfd, 2, empty ? nullptr : &timeout, nullptr);
    } while ((pfd[0].revents & POLLIN) == 0);
  }
+
+const char* LogServer::GetField(const char* buf, size_t size, int field_no,
+                                size_t* field_size) {
+  while (field_no != 0) {
+    const char* pos = static_cast<const char*>(memchr(buf, ' ', size));
+    if (pos == nullptr) return nullptr;
+    ++pos;
+    size -= pos - buf;
+    buf = pos;
+    --field_no;
+  }
+  const char* pos = static_cast<const char*>(memchr(buf, ' ', size));
+  *field_size = (pos != nullptr) ? (pos - buf) : size;
+  return buf;
+}
+
+LogServer::LogStream* LogServer::GetStream(const char* msg_id,
+                                           size_t msg_id_size) {
+  if (msg_id_size == current_stream_->log_name_size() &&
+      memcmp(msg_id, current_stream_->log_name_data(), msg_id_size) == 0) {
+    return current_stream_;
+  }
+  std::string log_name{msg_id, msg_id_size};
+  auto iter = log_streams_.find(log_name);
+  if (iter != log_streams_.end()) return iter->second;
+  if (no_of_log_streams_ >= kMaxNoOfStreams) return nullptr;
+  if (!ValidateLogName(msg_id, msg_id_size)) return nullptr;
+  LogStream* stream = new LogStream{log_name};
+  auto result = log_streams_.insert(
+      std::map<std::string, LogStream*>::value_type{log_name, stream});
+  if (!result.second) osaf_abort(msg_id_size);
+  ++no_of_log_streams_;
+  return stream;
+}
+
+bool LogServer::ValidateLogName(const char* msg_id, size_t msg_id_size) {
[HansN] this function can be replaced with a std::regex:

bool LogServer::ValidateLogName(const char* msg_id, size_t msg_id_size) {

  if (msg_id_size < 1 || msg_id_size > LogStream::kMaxLogNameSize) return false;
  std::regex is_valid{R"([_[:alpha:]]\.?\w*)"};

   if (std::regex_match(msg_id, is_valid)) {
    return true;
   } else {
    return false;
  }
}
+  if (msg_id_size < 1 || msg_id_size > LogStream::kMaxLogNameSize) return 
false;
+  if (msg_id[0] == '.') return false;
+  size_t no_of_dots = 0;
+  for (size_t i = 0; i != msg_id_size; ++i) {
+    char c = msg_id[i];
+    if (!((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') ||
+          (c >= '0' && c <= '9') || (c == '.' || c == '_')))
+      return false;
+    if (c == '.') ++no_of_dots;
+  }
+  return no_of_dots < 2;
+}
+
+LogServer::LogStream::LogStream(const std::string& log_name)
+    : log_name_{log_name}, last_flush_{}, log_writer_{log_name} {
+  if (log_name.size() > kMaxLogNameSize) osaf_abort(log_name.size());
+}
+
+void LogServer::LogStream::Write(size_t size) {
+  if (log_writer_.empty()) last_flush_ = base::ReadMonotonicClock();
+  log_writer_.Write(size);
+}
+
+void LogServer::LogStream::Flush() {
+  log_writer_.Flush();
+  last_flush_ = base::ReadMonotonicClock();
+}
diff --git a/src/dtm/transport/log_server.h b/src/dtm/transport/log_server.h
index ab275800d..27f2472f1 100644
--- a/src/dtm/transport/log_server.h
+++ b/src/dtm/transport/log_server.h
@@ -19,6 +19,7 @@
  #define DTM_TRANSPORT_LOG_SERVER_H_
#include <cstddef>
+#include <map>
  #include <string>
  #include "base/macros.h"
  #include "base/unix_server_socket.h"
@@ -28,6 +29,7 @@
  // and sends them to a LogWriter instance.
  class LogServer {
   public:
+  static constexpr size_t kMaxNoOfStreams = 32;
    // @a term_fd is a file descriptor that will become readable when the 
program
    // should exit because it has received the SIGTERM signal.
    explicit LogServer(int term_fd);
@@ -39,9 +41,41 @@ class LogServer {
    void Run();
private:
+  class LogStream {
+   public:
+    static constexpr size_t kMaxLogNameSize = 32;
+    LogStream(const std::string& log_name);
+
+    size_t log_name_size() const { return log_name_.size(); }
+    const char* log_name_data() const { return log_name_.data(); }
+    char* current_buffer_position() {
+      return log_writer_.current_buffer_position();
+    }
+    bool empty() const { return log_writer_.empty(); }
+    // Write @a size bytes of log message data in the memory pointed to by @a
+    // buffer to the MDS log file. After the log message has been written, the
+    // file will be rotated if necessary. This method performs blocking file
+    // I/O.
+    void Write(size_t size);
+    void Flush();
+    struct timespec last_flush() const {
+      return last_flush_;
+    }
+
+   private:
+    const std::string log_name_;
+    struct timespec last_flush_;
+    LogWriter log_writer_;
+  };
+  static const char* GetField(const char* buf, size_t size, int field_no,
+                              size_t* field_size);
+  LogStream* GetStream(const char* msg_id, size_t msg_id_size);
+  static bool ValidateLogName(const char* msg_id, size_t msg_id_size);
    int term_fd_;
    base::UnixServerSocket log_socket_;
-  LogWriter log_writer_;
+  std::map<std::string, LogStream*> log_streams_;
+  LogStream* current_stream_;
+  size_t no_of_log_streams_;
DELETE_COPY_AND_MOVE_OPERATORS(LogServer);
  };
diff --git a/src/dtm/transport/log_writer.cc b/src/dtm/transport/log_writer.cc
index c6501eca2..0025c7601 100644
--- a/src/dtm/transport/log_writer.cc
+++ b/src/dtm/transport/log_writer.cc
@@ -25,10 +25,10 @@
  #include "base/getenv.h"
  #include "osaf/configmake.h"
-LogWriter::LogWriter()
-    : mds_log_file_{base::GetEnv<std::string>("pkglogdir", PKGLOGDIR) +
-                    "/mds.log"},
-      old_mds_log_file_{mds_log_file_ + ".1"},
+LogWriter::LogWriter(const std::string& log_name)
+    : log_file_{base::GetEnv<std::string>("pkglogdir", PKGLOGDIR) + "/" +
+                log_name},
+      old_log_file_{log_file_ + ".1"},
        fd_{-1},
        current_file_size_{0},
        current_buffer_size_{0},
@@ -44,7 +44,7 @@ void LogWriter::Open() {
    if (fd_ < 0) {
      int fd;
      do {
-      fd = open(mds_log_file(), O_WRONLY | O_CLOEXEC | O_CREAT,
+      fd = open(log_file(), O_WRONLY | O_CLOEXEC | O_CREAT,
                  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
      } while (fd == -1 && errno == EINTR);
      if (fd >= 0) {
@@ -66,9 +66,9 @@ void LogWriter::Close() {
void LogWriter::RotateLog() {
    Close();
-  unlink(old_mds_log_file());
-  if (rename(mds_log_file(), old_mds_log_file()) != 0) {
-    unlink(mds_log_file());
+  unlink(old_log_file());
+  if (rename(log_file(), old_log_file()) != 0) {
+    unlink(log_file());
    }
  }
diff --git a/src/dtm/transport/log_writer.h b/src/dtm/transport/log_writer.h
index 546be5eaf..927e2b650 100644
--- a/src/dtm/transport/log_writer.h
+++ b/src/dtm/transport/log_writer.h
@@ -29,7 +29,7 @@ class LogWriter {
   public:
    constexpr static const size_t kMaxMessageSize = 2 * size_t{1024};
- LogWriter();
+  LogWriter(const std::string& log_name);
    virtual ~LogWriter();
char* current_buffer_position() { return buffer_ + current_buffer_size_; }
@@ -48,12 +48,12 @@ class LogWriter {
    void Close();
    void RotateLog();
- const char* mds_log_file() const { return mds_log_file_.c_str(); }
+  const char* log_file() const { return log_file_.c_str(); }
- const char* old_mds_log_file() const { return old_mds_log_file_.c_str(); }
+  const char* old_log_file() const { return old_log_file_.c_str(); }
- const std::string mds_log_file_;
-  const std::string old_mds_log_file_;
+  const std::string log_file_;
+  const std::string old_log_file_;
    int fd_;
    size_t current_file_size_;
    size_t current_buffer_size_;
diff --git a/src/dtm/transport/osaf-transport.in 
b/src/dtm/transport/osaf-transport.in
index 527208238..3c0936507 100644
--- a/src/dtm/transport/osaf-transport.in
+++ b/src/dtm/transport/osaf-transport.in
@@ -34,7 +34,7 @@ RETVAL=0
  binary=$pkglibdir/$osafprog
  pidfile=$pkgpiddir/$osafprog.pid
  lockfile=$lockdir/$initscript
-sockfile=$pkglocalstatedir/mds_log.sock
+sockfile=$pkglocalstatedir/osaf_log.sock
start() {
        [ -p $NIDFIFO ] || return 1
diff --git a/src/dtm/transport/tests/log_writer_test.cc 
b/src/dtm/transport/tests/log_writer_test.cc
index 164b079c0..649eeac78 100644
--- a/src/dtm/transport/tests/log_writer_test.cc
+++ b/src/dtm/transport/tests/log_writer_test.cc
@@ -54,7 +54,7 @@ TEST_F(LogWriterTest, ExistingFileShouldBeAppended) {
    std::ofstream ostr(tmpdir_ + std::string("/mds.log"));
    ostr << first_line << std::endl;
    ostr.close();
-  LogWriter* log_writer = new LogWriter();
+  LogWriter* log_writer = new LogWriter("mds.log");
    memcpy(log_writer->current_buffer_position(), second_line,
           sizeof(second_line) - 1);
    log_writer->current_buffer_position()[sizeof(second_line) - 1] = '\n';
diff --git a/src/dtm/transport/transport_monitor.h 
b/src/dtm/transport/transport_monitor.h
index 24ebee6a2..bedba9dfc 100644
--- a/src/dtm/transport/transport_monitor.h
+++ b/src/dtm/transport/transport_monitor.h
@@ -58,19 +58,12 @@ class TransportMonitor {
const char* proc_path() const { return proc_path_.c_str(); } - const char* mds_log_file() const { return mds_log_file_.c_str(); }
-
-  const char* old_mds_log_file() const { return old_mds_log_file_.c_str(); }
-
    static bool IsDir(const std::string& path);
-  static uint64_t FileSize(const std::string& path);
int term_fd_;
    const std::string fifo_file_{PKGLOCALSTATEDIR "/osafdtmd.fifo"};
    const std::string pkgpiddir_;
    const std::string proc_path_;
-  const std::string mds_log_file_;
-  const std::string old_mds_log_file_;
    const bool use_tipc_;
DELETE_COPY_AND_MOVE_OPERATORS(TransportMonitor);
diff --git a/src/log/apitest/tet_cfg_destination.c 
b/src/log/apitest/tet_cfg_destination.c
index 513f59cf0..d41471e71 100644
--- a/src/log/apitest/tet_cfg_destination.c
+++ b/src/log/apitest/tet_cfg_destination.c
@@ -22,9 +22,9 @@
  
//==============================================================================
  // Validate `logRecordDestinationConfiguration`
  
//==============================================================================
-const char validDest[] = "desta;UNIX_SOCKET;/var/lib/opensaf/mds_log.sock";
+const char validDest[] = "desta;UNIX_SOCKET;/var/lib/opensaf/osaf_log.sock";
  // Multi destinations
-const char multiDest1[] = "desta;UNIX_SOCKET;/var/lib/opensaf/mds_log.sock";
+const char multiDest1[] = "desta;UNIX_SOCKET;/var/lib/opensaf/osaf_log.sock";
  const char multiDest2[] = "destb;UNIX_SOCKET;/var/lib/opensaf/local.sock";
  // Delete destination
  const char delDest[] = "";
@@ -35,14 +35,14 @@ const char nildest[] = "destc;UNIX_SOCKET;";
  // Abnormal cases
  //<
  // Only destination type = "unix" is supported.
-const char invalidTypeDest[] = "test;invalid;/var/lib/opensaf/mds_log.sock";
+const char invalidTypeDest[] = "test;invalid;/var/lib/opensaf/osaf_log.sock";
  // "name" & "type" must have at least.
  const char invalidFmtDest[] = "test";
  // "name" must not contain special characters
  const char invalidNameDest[] = "destA?abc;UNIX_SOCKET;";
  // Same "name" must go with same "dest" and vice versa.
  const char invalidDuplicatedDest[] =
-    "destb;UNIX_SOCKET;/var/lib/opensaf/mds_log.sock";
+    "destb;UNIX_SOCKET;/var/lib/opensaf/osaf_log.sock";
  const char nilname[] = "destc";
// Configure destination command
diff --git a/src/mds/mds_log.cc b/src/mds/mds_log.cc
index 94c94e669..421e26317 100644
--- a/src/mds/mds_log.cc
+++ b/src/mds/mds_log.cc
@@ -120,7 +120,7 @@ bool MdsLog::Init() {
    base::Conf::InitFullyQualifiedDomainName();
    const std::string &fqdn = base::Conf::FullyQualifiedDomainName();
    instance_ =
-      new MdsLog{fqdn, app_name, process_id, PKGLOCALSTATEDIR "/mds_log.sock"};
+      new MdsLog{fqdn, app_name, process_id, PKGLOCALSTATEDIR 
"/osaf_log.sock"};
    return instance_ != nullptr;
  }


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
Opensaf-devel mailing list
Opensaf-devel@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/opensaf-devel

Reply via email to