On 7/14/20 3:35 PM, Dumitru Ceara wrote:
> On 6/30/20 3:24 AM, Ilya Maximets wrote:
>> For debugging purposes it is useful to be able to record all the
>> incoming transactions and commands and replay them locally under
>> debugger or with additional logging enabled.  This patch introduces
>> ability to record all the incoming stream data and replay it via new
>> stream provider named 'stream-replay'.  During the record phase all
>> the incoming stream data written to special replay_* files in the
>> application rundir.  On replay phase instead of opening real streams
>> application will open replay_* files and read all the incoming data
>> directly from them.
>>
>> If enabled for ovsdb-server, for example, this allows to record all
>> the connections and transactions from the big setup and replay them
>> locally afterwards to debug the behaviour or tests performance.
>>
>> To start application in recording mode there is a --stream-replay-record
>> cmdline option. --stream-replay is to replay previously recorded
>> streams.  If application depends on generation of UUIDs, it's likely
>> required to pass --predictable-uuids-with-seed=XXX cmdline option in
>> both runs in order to have same UUIDs generated.
>>
>> Current version doesn't work well with time-based stream events like
>> inactivity probes or any other events generated internally.  This is
>> a point for further improvement.
>>
>> Signed-off-by: Ilya Maximets <i.maxim...@ovn.org>
>> ---
>>  lib/automake.mk         |   1 +
>>  lib/stream-provider.h   |   4 +
>>  lib/stream-replay.c     | 561 ++++++++++++++++++++++++++++++++++++++++
>>  lib/stream.c            |  38 ++-
>>  lib/stream.h            |  40 ++-
>>  ovsdb/ovsdb-client.c    |   2 +-
>>  ovsdb/ovsdb-server.c    |   2 +-
>>  tests/test-jsonrpc.c    |   2 +-
>>  utilities/ovs-vsctl.c   |   2 +-
>>  vswitchd/ovs-vswitchd.c |   2 +-
>>  vtep/vtep-ctl.c         |   2 +-
>>  11 files changed, 642 insertions(+), 14 deletions(-)
>>  create mode 100644 lib/stream-replay.c
>>
>> diff --git a/lib/automake.mk b/lib/automake.mk
>> index 86940ccd2..bbe7d04f1 100644
>> --- a/lib/automake.mk
>> +++ b/lib/automake.mk
>> @@ -281,6 +281,7 @@ lib_libopenvswitch_la_SOURCES = \
>>      lib/stream-fd.c \
>>      lib/stream-fd.h \
>>      lib/stream-provider.h \
>> +    lib/stream-replay.c \
>>      lib/stream-ssl.h \
>>      lib/stream-tcp.c \
>>      lib/stream.c \
>> diff --git a/lib/stream-provider.h b/lib/stream-provider.h
>> index 75f4f059b..0ce4f6f4c 100644
>> --- a/lib/stream-provider.h
>> +++ b/lib/stream-provider.h
>> @@ -29,6 +29,7 @@ struct stream {
>>      const struct stream_class *class;
>>      int state;
>>      int error;
>> +    FILE *replay_wfd;
>>      char *name;
>>      char *peer_id;
>>  };
>> @@ -133,6 +134,7 @@ struct pstream {
>>      const struct pstream_class *class;
>>      char *name;
>>      ovs_be16 bound_port;
>> +    FILE *replay_wfd;
>>  };
>>  
>>  void pstream_init(struct pstream *, const struct pstream_class *, char 
>> *name);
>> @@ -200,5 +202,7 @@ extern const struct pstream_class pwindows_pstream_class;
>>  extern const struct stream_class ssl_stream_class;
>>  extern const struct pstream_class pssl_pstream_class;
>>  #endif
>> +extern const struct stream_class replay_stream_class;
>> +extern const struct pstream_class preplay_pstream_class;
>>  
>>  #endif /* stream-provider.h */
>> diff --git a/lib/stream-replay.c b/lib/stream-replay.c
>> new file mode 100644
>> index 000000000..16486f94e
>> --- /dev/null
>> +++ b/lib/stream-replay.c
>> @@ -0,0 +1,561 @@
>> +/*
>> + * Copyright (c) 2020, Red Hat, Inc.
>> + *
>> + * Licensed under the Apache License, Version 2.0 (the "License");
>> + * you may not use this file except in compliance with the License.
>> + * You may obtain a copy of the License at:
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +#include <config.h>
>> +#include <ctype.h>
>> +#include <errno.h>
>> +#include <poll.h>
>> +#include <stdlib.h>
>> +#include <string.h>
>> +#include <sys/socket.h>
>> +#include <sys/types.h>
>> +#include <unistd.h>
>> +#include "dirs.h"
>> +#include "ovs-atomic.h"
>> +#include "util.h"
>> +#include "stream-provider.h"
>> +#include "stream.h"
>> +#include "openvswitch/poll-loop.h"
>> +#include "openvswitch/vlog.h"
>> +
>> +VLOG_DEFINE_THIS_MODULE(stream_replay);
>> +
>> +static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
>> +
>> +/* Stream replay. */
>> +
>> +static struct ovs_mutex replay_mutex = OVS_MUTEX_INITIALIZER;
>> +static int replay_seqno OVS_GUARDED_BY(replay_mutex) = 0;
>> +static atomic_int replay_state = ATOMIC_VAR_INIT(STREAM_REPLAY_NONE);
>> +
>> +void
>> +stream_replay_set_state(enum stream_replay_state state)
>> +{
>> +    atomic_store_relaxed(&replay_state, state);
>> +}
>> +
>> +enum stream_replay_state
>> +stream_replay_get_state(void)
>> +{
>> +    int state;
>> +
>> +    atomic_read_relaxed(&replay_state, &state);
>> +    return state;
>> +}
>> +
>> +static char *
>> +replay_file_name(const char *name, int seqno)
>> +{
>> +    char *local_name = xstrdup(name);
>> +    char *filename, *p, *c;
>> +    bool skip = false;
>> +
>> +    /* Replace all the numbers and special symbols with single underscore.
>> +     * Numbers might be PIDs or port numbers that could change between 
>> record
>> +     * and replay phases, special symbols might be not good as a filename.
>> +     * We have a unique seuqence number as part of the name, so we don't 
>> care
>> +     * keeping too much information. */
>> +    for (c = p = local_name; *p; p++) {
>> +         if (!isalpha((unsigned char) *p)) {
>> +             if (!skip) {
>> +                *c++ = '_';
>> +                skip = true;
>> +             }
>> +         } else {
>> +             *c++ = *p;
>> +             skip = false;
>> +         }
>> +    }
>> +    if (skip) {
>> +        c--;
>> +    }
>> +    *c = '\0';
>> +    filename = xasprintf("replay_%s_%d", local_name, seqno);
>> +    VLOG_DBG("Constructing replay filename: '%s' --> '%s' --> '%s'.",
>> +             name, local_name, filename);
>> +    free(local_name);
>> +
>> +    return filename;
>> +}
>> +
>> +/* In write mode creates a new replay file to write stream replay.
>> + * In read mode opens an existing replay file. */
>> +static int
>> +replay_file_open(const char *name, FILE **f, int *seqno)
>> +    OVS_REQUIRES(replay_mutex)
>> +{
>> +    char *file_path, *filename;
>> +    int state = stream_replay_get_state();
>> +
>> +    ovs_assert(state != STREAM_REPLAY_NONE);
>> +
>> +    filename = replay_file_name(name, replay_seqno);
>> +    file_path = abs_file_name(ovs_rundir(), filename);
>> +    free(filename);
>> +
>> +    *f = fopen(file_path, state == STREAM_REPLAY_WRITE ? "wb" : "rb");
>> +    if (!*f) {
>> +        VLOG_ERR("%s: fopen failed: %s", file_path, ovs_strerror(errno));
>> +        free(file_path);
>> +        return errno;
>> +    }
>> +    free(file_path);
>> +
>> +    if (state == STREAM_REPLAY_READ
>> +        && fread(seqno, sizeof *seqno, 1, *f) != 1) {
>> +        VLOG_INFO("%s: failed to read seqno: stream might be empty.", name);
>> +        *seqno = INT_MAX;
>> +    }
>> +    replay_seqno++;  /* New file opened. */
>> +    return 0;
>> +}
>> +
>> +static int
>> +replay_write(FILE *f, const void *buffer, int n, bool is_read)
>> +{
>> +    int state = stream_replay_get_state();
>> +    int seqno_to_write;
>> +    int retval = 0;
>> +
>> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
>> +        return 0;
>> +    }
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +
>> +    seqno_to_write = is_read ? replay_seqno : -replay_seqno;
>> +    if (fwrite(&seqno_to_write, sizeof seqno_to_write, 1, f) != 1) {
>> +        VLOG_ERR_RL(&rl, "Failed to write seqno.");
>> +        retval = -1;
>> +        goto out;
>> +    }
>> +    if (fwrite(&n, sizeof n, 1, f) != 1) {
>> +        VLOG_ERR_RL(&rl, "Failed to write length.");
>> +        retval = -1;
>> +        goto out;
>> +    }
>> +    if (n > 0 && is_read && fwrite(buffer, 1, n, f) != n) {
>> +        VLOG_ERR_RL(&rl, "Failed to write data.");
>> +        retval = -1;
>> +    }
>> +out:
>> +    replay_seqno++; /* Write completed. */
>> +    ovs_mutex_unlock(&replay_mutex);
>> +    return retval;
>> +}
>> +
>> +static int
>> +replay_read(FILE *f, void *buffer, int buffer_size,
>> +            int *len, int *seqno, bool is_read)
>> +    OVS_REQUIRES(replay_mutex)
>> +{
>> +    int retval = EINVAL;
>> +
>> +    if (fread(len, sizeof *len, 1, f) != 1
>> +        || (is_read && *len > buffer_size)) {
>> +        VLOG_ERR("Failed to read replay length.");
>> +        goto out;
>> +    }
>> +
>> +    if (*len > 0 && is_read && fread(buffer, 1, *len, f) != *len) {
>> +        VLOG_ERR("Failed to read replay buffer.");
>> +        goto out;
>> +    }
>> +
>> +    if (fread(seqno, sizeof *seqno, 1, f) != 1) {
>> +        *seqno = INT_MAX;  /* Most likely EOF. */
>> +        if (ferror(f)) {
>> +            VLOG_INFO("Failed to read replay seqno.");
>> +            goto out;
>> +        }
>> +    }
>> +
>> +    retval = 0;
>> +out:
>> +    replay_seqno++;  /* Read completed. */
>> +    return retval;
>> +}
>> +
>> +
>> +/* Active replay stream. */
>> +
>> +struct stream_replay
>> +{
>> +    struct stream stream;
>> +    FILE *f;
>> +    int seqno;
>> +};
>> +
>> +const struct stream_class replay_stream_class;
>> +
>> +static inline bool
>> +seqno_is_read(int seqno)
>> +{
>> +    return seqno >= 0;
>> +}
>> +
>> +static inline int
>> +normalized_seqno(int seqno)
>> +{
>> +    return seqno >= 0 ? seqno : -seqno;
>> +}
>> +
>> +
>> +/* Creates a new stream named 'name' that will emulate sending and receiving
>> + * data using replay file and stores a pointer to the stream in '*streamp'.
>> + *
>> + * Takes ownership of 'name'.
>> + *
>> + * Returns 0 if successful, otherwise a positive errno value. */
>> +static int
>> +new_replay_stream(char *name, struct stream **streamp)
>> +    OVS_REQUIRES(replay_mutex)
>> +{
>> +    struct stream_replay *s;
>> +    int seqno = 0, error;
>> +    FILE *f;
>> +
>> +    error = replay_file_open(name, &f, &seqno);
>> +    if (error) {
>> +        VLOG_ERR("%s: failed to open stream.", name);
>> +        return error;
>> +    }
>> +
>> +    s = xmalloc(sizeof *s);
>> +    stream_init(&s->stream, &replay_stream_class, 0, name);
>> +    s->f = f;
>> +    s->seqno = seqno;
>> +    *streamp = &s->stream;
>> +    return 0;
>> +}
>> +
>> +static struct stream_replay *
>> +stream_replay_cast(struct stream *stream)
>> +{
>> +    stream_assert_class(stream, &replay_stream_class);
>> +    return CONTAINER_OF(stream, struct stream_replay, stream);
>> +}
>> +
>> +void
>> +stream_replay_open_wfd(struct stream *s)
>> +{
>> +    FILE *f;
>> +    int state = stream_replay_get_state();
>> +
>> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
>> +        return;
>> +    }
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +    if (!replay_file_open(s->name, &f, NULL)) {
>> +        s->replay_wfd = f;
>> +    }
>> +    ovs_mutex_unlock(&replay_mutex);
>> +}
>> +
>> +void
>> +stream_replay_write(struct stream *s, const void *buffer, int n, bool 
>> is_read)
>> +{
>> +    int state = stream_replay_get_state();
>> +
>> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
>> +        return;
>> +    }
>> +    if (replay_write(s->replay_wfd, buffer, n, is_read)) {
>> +        VLOG_ERR("%s: failed to write buffer.", s->name);
>> +    }
>> +}
>> +
>> +void
>> +stream_replay_close_wfd(struct stream *s)
>> +{
>> +    if (s->replay_wfd) {
>> +        fclose(s->replay_wfd);
>> +    }
>> +}
>> +
>> +static int
>> +replay_open(const char *name, char *suffix OVS_UNUSED, struct stream 
>> **streamp,
>> +          uint8_t dscp OVS_UNUSED)
>> +{
>> +    int retval;
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +    retval = new_replay_stream(xstrdup(name), streamp);
>> +    ovs_mutex_unlock(&replay_mutex);
>> +
>> +    return retval;
>> +}
>> +
>> +static void
>> +replay_close(struct stream *stream)
>> +{
>> +    struct stream_replay *s = stream_replay_cast(stream);
>> +    fclose(s->f);
>> +    free(s);
>> +}
>> +
>> +static ssize_t
>> +replay_recv(struct stream *stream, void *buffer, size_t n)
>> +{
>> +    struct stream_replay *s = stream_replay_cast(stream);
>> +    int norm_seqno = normalized_seqno(s->seqno);
>> +    int error, len;
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +    ovs_assert(norm_seqno >= replay_seqno);
>> +
>> +    if (norm_seqno != replay_seqno || !seqno_is_read(s->seqno)) {
>> +        error = EAGAIN;
>> +        goto unlock;
>> +    }
>> +
>> +    error = replay_read(s->f, buffer, n, &len, &s->seqno, true);
>> +    if (error) {
>> +        VLOG_ERR("%s: failed to read from replay file.", stream->name);
>> +        goto unlock;
>> +    }
>> +
>> +unlock:
>> +    ovs_mutex_unlock(&replay_mutex);
>> +    return error ? -error : len;
>> +}
>> +
>> +static ssize_t
>> +replay_send(struct stream *stream OVS_UNUSED, const void *buffer OVS_UNUSED,
>> +            size_t n)
>> +{
>> +    struct stream_replay *s = stream_replay_cast(stream);
>> +    int norm_seqno = normalized_seqno(s->seqno);
>> +    int error, len;
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +    ovs_assert(norm_seqno >= replay_seqno);
>> +
>> +    if (norm_seqno != replay_seqno || seqno_is_read(s->seqno)) {
>> +        error = EAGAIN;
>> +        goto unlock;
>> +    }
>> +
>> +    error = replay_read(s->f, NULL, 0, &len, &s->seqno, false);
>> +    if (error) {
>> +        VLOG_ERR("%s: failed to read from replay file.", stream->name);
>> +        goto unlock;
>> +    }
>> +    ovs_assert(len < 0 || len <= n);
>> +
>> +unlock:
>> +    ovs_mutex_unlock(&replay_mutex);
>> +    return error ? -error : len;
>> +}
>> +
>> +static void
>> +replay_wait(struct stream *stream, enum stream_wait_type wait)
>> +{
>> +    struct stream_replay *s = stream_replay_cast(stream);
>> +    switch (wait) {
>> +    case STREAM_CONNECT:
>> +        /* Connect does nothing and always avaialable. */
>> +        poll_immediate_wake();
>> +        break;
>> +
>> +    case STREAM_SEND:
>> +        if (s->seqno != INT_MAX && !seqno_is_read(s->seqno)) {
>> +            /* Stream waits for write. */
>> +            poll_immediate_wake();
>> +        }
>> +        break;
>> +
>> +    case STREAM_RECV:
>> +        if (s->seqno != INT_MAX && seqno_is_read(s->seqno)) {
>> +            /* We still have something to read. */
>> +            poll_immediate_wake();
>> +        }
>> +        break;
>> +
>> +    default:
>> +        OVS_NOT_REACHED();
>> +    }
>> +}
>> +
>> +const struct stream_class replay_stream_class = {
>> +    "replay",                   /* name */
>> +    false,                      /* needs_probes */
>> +    replay_open,                /* open */
>> +    replay_close,               /* close */
>> +    NULL,                       /* connect */
>> +    replay_recv,                /* recv */
>> +    replay_send,                /* send */
>> +    NULL,                       /* run */
>> +    NULL,                       /* run_wait */
>> +    replay_wait,                /* wait */
>> +};
>> +
>> +/* Passive file descriptor stream. */
>> +
>> +struct replay_pstream
>> +{
>> +    struct pstream pstream;
>> +    FILE *f;
>> +    int seqno;
>> +};
>> +
>> +const struct pstream_class preplay_pstream_class;
>> +
>> +static struct replay_pstream *
>> +replay_pstream_cast(struct pstream *pstream)
>> +{
>> +    pstream_assert_class(pstream, &preplay_pstream_class);
>> +    return CONTAINER_OF(pstream, struct replay_pstream, pstream);
>> +}
>> +
>> +/* Creates a new pstream named 'name' that will accept new replay 
>> connections
>> + * reading them from the replay file and stores a pointer to the stream in
>> + * '*pstreamp'.
>> + *
>> + * Takes ownership of 'name'.
>> + *
>> + * Returns 0 if successful, otherwise a positive errno value. */
>> +static int
>> +preplay_listen(const char *name, char *suffix OVS_UNUSED,
>> +               struct pstream **pstreamp, uint8_t dscp OVS_UNUSED)
>> +{
>> +    int seqno = 0, error;
>> +    FILE *f;
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +    error = replay_file_open(name, &f, &seqno);
>> +    ovs_mutex_unlock(&replay_mutex);
>> +    if (error) {
>> +        VLOG_ERR("%s: failed to open pstream.", name);
>> +        return error;
>> +    }
>> +
>> +    struct replay_pstream *ps = xmalloc(sizeof *ps);
>> +    pstream_init(&ps->pstream, &preplay_pstream_class, xstrdup(name));
>> +    ps->f = f;
>> +    ps->seqno = seqno;
>> +    *pstreamp = &ps->pstream;
>> +    return 0;
>> +}
>> +
>> +void
>> +pstream_replay_open_wfd(struct pstream *ps)
>> +{
>> +    FILE *f;
>> +    int state = stream_replay_get_state();
>> +
>> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
>> +        return;
>> +    }
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +    if (!replay_file_open(ps->name, &f, NULL)) {
>> +        ps->replay_wfd = f;
>> +    }
>> +    ovs_mutex_unlock(&replay_mutex);
>> +}
>> +
>> +
>> +void
>> +pstream_replay_write_accept(struct pstream *ps, const struct stream *s)
>> +{
>> +    int state = stream_replay_get_state();
>> +    int len;
>> +
>> +    if (OVS_LIKELY(state != STREAM_REPLAY_WRITE)) {
>> +        return;
>> +    }
>> +
>> +    len = strlen(s->name);
>> +    if (replay_write(ps->replay_wfd, s->name, len, true)) {
>> +        VLOG_ERR("%s: failed to write accept name: %s", ps->name, s->name);
>> +    }
>> +}
>> +
>> +void
>> +pstream_replay_close_wfd(struct pstream *ps)
>> +{
>> +    if (ps->replay_wfd) {
>> +        fclose(ps->replay_wfd);
>> +    }
>> +}
>> +
>> +
>> +static void
>> +preplay_close(struct pstream *pstream)
>> +{
>> +    struct replay_pstream *ps = replay_pstream_cast(pstream);
>> +
>> +    fclose(ps->f);
>> +    free(ps);
>> +}
>> +
>> +#define MAX_NAME_LEN 65536
>> +
>> +static int
>> +preplay_accept(struct pstream *pstream, struct stream **new_streamp)
>> +{
>> +    struct replay_pstream *ps = replay_pstream_cast(pstream);
>> +    int norm_seqno = normalized_seqno(ps->seqno);
>> +    int retval, len;
>> +    char name[MAX_NAME_LEN];
>> +
>> +    ovs_mutex_lock(&replay_mutex);
>> +    ovs_assert(norm_seqno >= replay_seqno);
>> +
>> +    if (norm_seqno != replay_seqno || !seqno_is_read(ps->seqno)) {
>> +        retval = EAGAIN;
>> +        goto unlock;
>> +    }
>> +
>> +    retval = replay_read(ps->f, name, MAX_NAME_LEN - 1,
>> +                         &len, &ps->seqno, true);
>> +    if (retval) {
>> +        VLOG_ERR("%s: failed to read from replay file.", pstream->name);
>> +        goto unlock;
>> +    }
>> +
>> +    if (len > 0) {
>> +        name[len] = 0;
>> +        retval = new_replay_stream(xstrdup(name), new_streamp);
>> +    } else {
>> +        retval = len;
>> +    }
>> +unlock:
>> +    ovs_mutex_unlock(&replay_mutex);
>> +    return retval;
>> +}
>> +
>> +static void
>> +preplay_wait(struct pstream *pstream)
>> +{
>> +    struct replay_pstream *ps = replay_pstream_cast(pstream);
>> +
>> +    if (ps->seqno != INT_MAX) {
>> +        /* Replay always has somthing to say. */
>> +        poll_immediate_wake();
>> +    }
>> +}
>> +
>> +const struct pstream_class preplay_pstream_class = {
>> +    "preplay",
>> +    false,
>> +    preplay_listen,
>> +    preplay_close,
>> +    preplay_accept,
>> +    preplay_wait,
>> +};
>> diff --git a/lib/stream.c b/lib/stream.c
>> index e246b3773..2ee392462 100644
>> --- a/lib/stream.c
>> +++ b/lib/stream.c
>> @@ -117,7 +117,7 @@ check_stream_classes(void)
>>   * connection methods supported by the stream. */
>>  void
>>  stream_usage(const char *name, bool active, bool passive,
>> -             bool bootstrap OVS_UNUSED)
>> +             bool bootstrap OVS_UNUSED, bool replay)
> 
> This is a non backwards compatible change that will break OVN
> compilation. Is there a way we can avoid that? I guess if this series
> gets applied, a follow up OVN patch is needed immediately to fix the
> stream_usage() callers.

Yeah, I know.  One option to avoid that is to write a separate function
just for replay stuff.  Or rename this one and make 2 wrappers on top.
One with a new argument and one like it is now.

> 
>>  {
>>      /* Really this should be implemented via callbacks into the stream
>>       * providers, but that seems too heavy-weight to bother with at the
>> @@ -161,6 +161,11 @@ stream_usage(const char *name, bool active, bool 
>> passive,
>>             "  --ssl-protocols=PROTOS  list of SSL protocols to enable\n"
>>             "  --ssl-ciphers=CIPHERS   list of SSL ciphers to enable\n");
>>  #endif
>> +    if (replay) {
>> +        printf("Replay options:\n"
>> +               "  --stream-replay-record turn on writing replay files\n"
>> +               "  --stream-replay        run connections from replay 
>> files\n");
>> +    }
>>  }
>>  
>>  /* Given 'name', a stream name in the form "TYPE:ARGS", stores the class
>> @@ -185,6 +190,9 @@ stream_lookup_class(const char *name, const struct 
>> stream_class **classp)
>>          if (strlen(class->name) == prefix_len
>>              && !memcmp(class->name, name, prefix_len)) {
>>              *classp = class;
>> +            if (stream_replay_get_state() == STREAM_REPLAY_READ) {
>> +                *classp = &replay_stream_class;
>> +            }
>>              return 0;
>>          }
>>      }
>> @@ -295,6 +303,7 @@ stream_close(struct stream *stream)
>>      if (stream != NULL) {
>>          char *name = stream->name;
>>          char *peer_id = stream->peer_id;
>> +        stream_replay_close_wfd(stream);
>>          (stream->class->close)(stream);
>>          free(name);
>>          free(peer_id);
>> @@ -367,9 +376,13 @@ int
>>  stream_recv(struct stream *stream, void *buffer, size_t n)
>>  {
>>      int retval = stream_connect(stream);
>> -    return (retval ? -retval
>> -            : n == 0 ? 0
>> -            : (stream->class->recv)(stream, buffer, n));
>> +
>> +    retval = retval ? -retval
>> +             : n == 0 ? 0
>> +             : (stream->class->recv)(stream, buffer, n);
>> +
>> +    stream_replay_write(stream, buffer, retval, true);
>> +    return retval;
>>  }
>>  
>>  /* Tries to send up to 'n' bytes of 'buffer' on 'stream', and returns:
>> @@ -385,9 +398,12 @@ int
>>  stream_send(struct stream *stream, const void *buffer, size_t n)
>>  {
>>      int retval = stream_connect(stream);
>> -    return (retval ? -retval
>> -            : n == 0 ? 0
>> -            : (stream->class->send)(stream, buffer, n));
>> +    retval = retval ? -retval
>> +             : n == 0 ? 0
>> +             : (stream->class->send)(stream, buffer, n);
>> +
>> +    stream_replay_write(stream, buffer, retval, false);
>> +    return retval;
>>  }
>>  
>>  /* Allows 'stream' to perform maintenance activities, such as flushing
>> @@ -483,6 +499,9 @@ pstream_lookup_class(const char *name, const struct 
>> pstream_class **classp)
>>          if (strlen(class->name) == prefix_len
>>              && !memcmp(class->name, name, prefix_len)) {
>>              *classp = class;
>> +            if (stream_replay_get_state() == STREAM_REPLAY_READ) {
>> +                *classp = &preplay_pstream_class;
>> +            }
>>              return 0;
>>          }
>>      }
>> @@ -548,6 +567,8 @@ pstream_open(const char *name, struct pstream 
>> **pstreamp, uint8_t dscp)
>>          goto error;
>>      }
>>  
>> +    pstream_replay_open_wfd(pstream);
>> +
> 
> If we fail listening on pstream (e.g.,
> "ovsdb_jsonrpc_server|ERR|ptcp:6641:172.17.1.78: listen failed: Cannot
> assign requested address") we won't reach this point. Later on, when
> trying to replay the recorded execution we'll get an error like:
> 
> stream_replay|ERR|/var/run/openvswitch/replay_ptcp_83: fopen failed: No
> such file or directory
> stream_replay|ERR|ptcp:6641:172.17.1.78: failed to open pstream.
> 
> I was a bit confused at first when trying the replay and I was wondering
> if I was using the functionality properly. I wonder if we can improve
> this somehow, and if it is possible to maybe record that listen failed.

Hmm.  I never cathed a listen failure, but yes, this event should be
recorded and replayed.

> 
>>      /* Success. */
>>      *pstreamp = pstream;
>>      return 0;
>> @@ -571,6 +592,7 @@ pstream_close(struct pstream *pstream)
>>  {
>>      if (pstream != NULL) {
>>          char *name = pstream->name;
>> +        pstream_replay_close_wfd(pstream);
>>          (pstream->class->close)(pstream);
>>          free(name);
>>      }
>> @@ -591,6 +613,8 @@ pstream_accept(struct pstream *pstream, struct stream 
>> **new_stream)
>>      } else {
>>          ovs_assert((*new_stream)->state != SCS_CONNECTING
>>                     || (*new_stream)->class->connect);
>> +        pstream_replay_write_accept(pstream, *new_stream);
>> +        stream_replay_open_wfd(*new_stream);
>>      }
>>      return retval;
>>  }
>> diff --git a/lib/stream.h b/lib/stream.h
>> index 77bffa498..40357137e 100644
>> --- a/lib/stream.h
>> +++ b/lib/stream.h
>> @@ -29,7 +29,8 @@ struct pstream;
>>  struct stream;
>>  struct vlog_module;
>>  
>> -void stream_usage(const char *name, bool active, bool passive, bool 
>> bootstrap);
>> +void stream_usage(const char *name, bool active, bool passive,
>> +                  bool bootstrap, bool replay);
>>  
>>  /* Bidirectional byte streams. */
>>  int stream_verify_name(const char *name);
>> @@ -94,4 +95,41 @@ enum stream_content_type {
>>  void stream_report_content(const void *, ssize_t, enum stream_content_type,
>>                             struct vlog_module *, const char *stream_name);
>>  
>> +
>> +/* Replay state. */
>> +enum stream_replay_state {
>> +    STREAM_REPLAY_NONE,
>> +    STREAM_REPLAY_WRITE,
>> +    STREAM_REPLAY_READ,
>> +};
>> +
>> +void stream_replay_set_state(enum stream_replay_state);
>> +enum stream_replay_state stream_replay_get_state(void);
>> +void stream_replay_open_wfd(struct stream *);
>> +void pstream_replay_open_wfd(struct pstream *);
>> +void stream_replay_close_wfd(struct stream *);
>> +void pstream_replay_close_wfd(struct pstream *);
>> +void stream_replay_write(struct stream *, const void *, int, bool is_read);
>> +void pstream_replay_write_accept(struct pstream *, const struct stream *);
>> +
>> +#define STREAM_REPLAY_OPTION_ENUMS  \
>> +        OPT_STREAM_REPLAY_REC,      \
>> +        OPT_STREAM_REPLAY
>> +
>> +#define STREAM_REPLAY_LONG_OPTIONS                                          
>>  \
>> +        {"stream-replay-record", no_argument, NULL, OPT_STREAM_REPLAY_REC}, 
>>  \
>> +        {"stream-replay",        no_argument, NULL, OPT_STREAM_REPLAY}
>> +
>> +#define STREAM_REPLAY_OPTION_HANDLERS                       \
>> +        case OPT_STREAM_REPLAY_REC:                         \
>> +            stream_replay_set_state(STREAM_REPLAY_WRITE);   \
> 
> I'm not sure exactly where but I think it might be useful to store the
> full command line that was use to start the process when recording was
> requested.
> 
> Maybe also automatically create a snapshot of the DB file in case the
> user forgets to do that manually.
> 
> It's not always easy to figure out what the command line is when
> ovsdb-server is started from a script, e.g., ovn-ctl starting
> ovsdb-server for NB/SB.

That is definitely useful.  For my testing I modified ovn scale test
framework to dump cmdlines and DB, so they could be used later on replay.
But yes, I think, it might be good to actually integrate and dump
right from the ovsdb process.

> 
> Thanks,
> Dumitru
> 
>> +            break;                                          \
>> +                                                            \
>> +        case OPT_STREAM_REPLAY:                             \
>> +            stream_replay_set_state(STREAM_REPLAY_READ);    \
>> +            break;
>> +
>> +#define STREAM_REPLAY_CASES \
>> +        case OPT_STREAM_REPLAY_REC: case OPT_STREAM_REPLAY:
>> +
>>  #endif /* stream.h */
>> diff --git a/ovsdb/ovsdb-client.c b/ovsdb/ovsdb-client.c
>> index 72756eb1f..1925f5213 100644
>> --- a/ovsdb/ovsdb-client.c
>> +++ b/ovsdb/ovsdb-client.c
>> @@ -462,7 +462,7 @@ usage(void)
>>             "\nThe default SERVER is unix:%s/db.sock.\n"
>>             "The default DATABASE is Open_vSwitch.\n",
>>             program_name, program_name, ovs_rundir());
>> -    stream_usage("SERVER", true, true, true);
>> +    stream_usage("SERVER", true, true, true, false);
>>      table_usage();
>>      printf("  --timestamp                 timestamp \"monitor\" output");
>>      daemon_usage();
>> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
>> index 42ba0053b..3af09c8c1 100644
>> --- a/ovsdb/ovsdb-server.c
>> +++ b/ovsdb/ovsdb-server.c
>> @@ -1868,7 +1868,7 @@ usage(void)
>>             program_name, program_name, ovs_dbdir());
>>      printf("\nJSON-RPC options (may be specified any number of times):\n"
>>             "  --remote=REMOTE         connect or listen to REMOTE\n");
>> -    stream_usage("JSON-RPC", true, true, true);
>> +    stream_usage("JSON-RPC", true, true, true, false);
>>      daemon_usage();
>>      vlog_usage();
>>      replication_usage();
>> diff --git a/tests/test-jsonrpc.c b/tests/test-jsonrpc.c
>> index 04e941b14..fa9c90b95 100644
>> --- a/tests/test-jsonrpc.c
>> +++ b/tests/test-jsonrpc.c
>> @@ -109,7 +109,7 @@ usage(void)
>>             "  request REMOTE METHOD PARAMS   send request, print reply\n"
>>             "  notify REMOTE METHOD PARAMS  send notification and exit\n",
>>             program_name, program_name);
>> -    stream_usage("JSON-RPC", true, true, true);
>> +    stream_usage("JSON-RPC", true, true, true, false);
>>      daemon_usage();
>>      vlog_usage();
>>      printf("\nOther options:\n"
>> diff --git a/utilities/ovs-vsctl.c b/utilities/ovs-vsctl.c
>> index 37cc72d40..e7dc50ed5 100644
>> --- a/utilities/ovs-vsctl.c
>> +++ b/utilities/ovs-vsctl.c
>> @@ -458,7 +458,7 @@ Options:\n\
>>      vlog_usage();
>>      printf("\
>>    --no-syslog             equivalent to --verbose=vsctl:syslog:warn\n");
>> -    stream_usage("database", true, true, true);
>> +    stream_usage("database", true, true, true, false);
>>      printf("\n\
>>  Other options:\n\
>>    -h, --help                  display this help message\n\
>> diff --git a/vswitchd/ovs-vswitchd.c b/vswitchd/ovs-vswitchd.c
>> index 1e72b628b..e61a1c3b7 100644
>> --- a/vswitchd/ovs-vswitchd.c
>> +++ b/vswitchd/ovs-vswitchd.c
>> @@ -276,7 +276,7 @@ usage(void)
>>             "where DATABASE is a socket on which ovsdb-server is listening\n"
>>             "      (default: \"unix:%s/db.sock\").\n",
>>             program_name, program_name, ovs_rundir());
>> -    stream_usage("DATABASE", true, false, true);
>> +    stream_usage("DATABASE", true, false, true, false);
>>      daemon_usage();
>>      vlog_usage();
>>      printf("\nDPDK options:\n"
>> diff --git a/vtep/vtep-ctl.c b/vtep/vtep-ctl.c
>> index ab552457d..09d407ac9 100644
>> --- a/vtep/vtep-ctl.c
>> +++ b/vtep/vtep-ctl.c
>> @@ -373,7 +373,7 @@ Options:\n\
>>      vlog_usage();
>>      printf("\
>>    --no-syslog                 equivalent to 
>> --verbose=vtep_ctl:syslog:warn\n");
>> -    stream_usage("database", true, true, false);
>> +    stream_usage("database", true, true, false, false);
>>      printf("\n\
>>  Other options:\n\
>>    -h, --help                  display this help message\n\
>>
> 

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to