On Fri, 2018-03-02 at 19:25 +0100, Christophe de Dinechin wrote:
> > On 2 Mar 2018, at 16:46, Lukáš Hrázký <lhra...@redhat.com> wrote:
> > 
> > On Fri, 2018-03-02 at 14:35 +0100, Christophe de Dinechin wrote:
> > > > On 2 Mar 2018, at 14:07, Lukáš Hrázký <lhra...@redhat.com> wrote:
> > > > 
> > > > On Thu, 2018-03-01 at 17:57 +0100, Christophe de Dinechin wrote:
> > > > > > On 1 Mar 2018, at 16:11, Lukáš Hrázký <lhra...@redhat.com> wrote:
> > > > > > 
> > > > > > On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote:
> > > > > > > From: Christophe de Dinechin <dinec...@redhat.com>
> > > > > > > 
> > > > > > > Doing this change will make it possible to move the capture loop 
> > > > > > > to the
> > > > > > > concrete-agent.cpp file.
> > > > > > > 
> > > > > > > Signed-off-by: Christophe de Dinechin <dinec...@redhat.com>
> > > > > > > ---
> > > > > > > include/spice-streaming-agent/errors.hpp |   2 +
> > > > > > > src/Makefile.am                          |   2 +
> > > > > > > src/message.hpp                          |  41 ++++++
> > > > > > > src/spice-streaming-agent.cpp            | 209 
> > > > > > > +------------------------------
> > > > > > > src/stream.cpp                           | 172 
> > > > > > > +++++++++++++++++++++++++
> > > > > > > src/stream.hpp                           |  55 ++++++++
> > > > > > > 6 files changed, 276 insertions(+), 205 deletions(-)
> > > > > > > create mode 100644 src/message.hpp
> > > > > > > create mode 100644 src/stream.cpp
> > > > > > > create mode 100644 src/stream.hpp
> > > > > > > 
> > > > > > > diff --git a/include/spice-streaming-agent/errors.hpp 
> > > > > > > b/include/spice-streaming-agent/errors.hpp
> > > > > > > index 870a0fd..62ae010 100644
> > > > > > > --- a/include/spice-streaming-agent/errors.hpp
> > > > > > > +++ b/include/spice-streaming-agent/errors.hpp
> > > > > > > @@ -90,4 +90,6 @@ protected:
> > > > > > > 
> > > > > > > }} // namespace spice::streaming_agent
> > > > > > > 
> > > > > > > +extern bool quit_requested;
> > > > > > 
> > > > > > Putting quit_requested into errors.hpp? Why?
> > > > > 
> > > > > Because errors.hpp deals with error conditions. You need to quit 
> > > > > other threads on signals or exceptions. See 
> > > > > https://gitlab.com/c3d/spice-streaming-agent/commit/07b0e0ea9317fab3867fb29d4367be8d4ad8ba98.
> > > > 
> > > > I don't think the flag belongs to the errors header at all, let alone a
> > > > public one. It's a generic control flow mechanism to signal the
> > > > termination of the program. The only relation to errors is that in case
> > > > of errors you want to (usually) also quit.
> > > 
> > > Well, ‘quit_requested’ is set for all “final" errors, whether we detect 
> > > them using exceptions or signals.
> > 
> > But a termination signal is not an error, it is the natural way to end
> > the program. Actually, exceptions thrown inside the main program loop
> > are another, second way to exit the loop besides setting the quit flag.
> 
> The termination signal in itself is not an error. But it leads to either an 
> error in one of the system calls, or a graceful interruption if we are 
> elsewhere, see below.

Ok, but I still don't think this warrants bundling the quit flag with
the exceptions...

> > 
> > From another point of view, you include errors.h in each place where
> > you are either throwing or catching exceptions. But the quit flag is
> > only needed in the main loop and in the signal handler.
> 
> Actually, for correct operation, it’s also needed in read_all and poll, as 
> these system calls may be interrupted. The current code is broken, I have 
> another fix but that was not submitted yet. Something like: 
> https://github.com/c3d/spice-streaming-agent/commit/de11d77e9b6fcb92f29eacc7da178624783aea6b
>  (similar tests are needed in read_all and write_all, I believe). This is 
> what fixes the “Control-C” problem.
> 
> Now, I see these as “OS errors” that are not reported through exception but 
> using errno. The proper handling of this kind of OS errors requires a check 
> of quit_requested.

I see. In one of my experiments, I managed to factor out the
quit_requested flag out of the Stream functions. But that did not take
into acount this addition, so it may not be possible.

> Obviously, we can put this flag somewhere else if that really annoys you, but 
> frankly, I think that errors.hpp is as good a place as any.

I would be glad if we did :)

> > 
> > > > 
> > > > Therefore the quit flag should not be coupled with errors and instead
> > > > used in the main control flow spot where global error handling is
> > > > taking place (and then in the signal handler). As a static variable it
> > > > should also be proliferating through the program as little as possible.
> > > > 
> > > > I've actually had the following idea for the quit flag, which I think
> > > > promotes the locality of the flag in the class design:
> > > > 
> > > > 
> > > > // file agent.{hpp,cpp}
> > > > class Agent {
> > > > public:
> > > >   bool& quit_flag() {
> > > >       return quit_requested;
> > > >   }
> > > > 
> > > >   void do_capture() {
> > > >       while(!quit_requested) {
> > > >           // ...
> > > >       }
> > > >   }
> > > > private:
> > > >   bool quit_requested = false;
> > > > };
> > > > 
> > > > 
> > > > // file main.cpp
> > > > static bool* quit_requested = nullptr;
> > > > 
> > > > void handle_sigterm() {
> > > >   *quit_requested = true;
> > > > }
> > > > 
> > > > int main() {
> > > >   Agent agent;
> > > >   quit_requested = &agent.quit_flag();
> > > >   ...
> > > > }
> > > > 
> > > > 
> > > > Some corner case handling (the quit_reqested pointer not yet set, etc.)
> > > > was left out for clarity.
> > > 
> > > I understand what you are trying to do, but you replace one global 
> > > variable with one (and several if I follow the logic below), so how is 
> > > helping with the proliferation?
> > 
> > Only this one global variable that is in the example. And the only
> > reason it is global is because of the signal handler, there is no other
> > way for it work. Therefore, I only make it "global locally" (excuse the
> > oxymoron :)) for the signal handler and limit what can access it as
> > much as I can.
> > 
> > (And it's just a pointer, the real flag is local to the loop which uses
> > it)
> 
> Adding one level of indirection here does not look like an improvement to me.

A level of indirection is never an improvement in itself :) the
improvement should have been no global variable shared across source
modules.

> If you want some kind of encapsulation, why not make the signal handler and 
> the quit flag static members of the agent instead? Would that work for you? 
> Something like this: 
> https://github.com/c3d/spice-streaming-agent/commit/077ad90ad2f923b90546a3be99988ddf45746ea7
>  ?

Yes, if it has to be a global quit flag, something like this is fine by
me.

> > 
> > > > 
> > > > This keeps the flag local to the loop in which it is relevant and the
> > > > static variable local to the main.cpp file. Thus it increases
> > > > modularity (which arguably we do not need that much here).
> > > > 
> > > > I would also use a different quit flag for the cursor thread (again,
> > > > local to the X11CursorUpdater) and take care of it in main() after the
> > > > Agent::do_capture() loop quits.
> > > 
> > > What benefits do you see in having multiple flags? Are there signals 
> > > where we can quit the agent without interrupting the cursor thread or 
> > > other activities?
> > 
> > No, I don't think there are. The reason is different. To me one global
> > "quit all" flag seems like a not well formed hierarchy of execution,
> > for one, it could introduce race conditions, if we generalize and say
> > we have several losely dependant threads/processes and you signal them
> > all to quit, the order they do so is arbitrary. Of course you can
> > introduce mechanisms like join() etc. to synchronize.
> 
>  
> We have such issues presently with the cursor thread. This is fixed in my 
> series, it now exits cleanly in all test cases I threw at it instead of 
> aborting or terminating.
> 
> > 
> > But what I would consider a better design would be one main loop in the
> > main thread, that also reacts to the signals. If the main loop exits,
> > then it would tear down the other threads signalling them in whatever
> > way is natural to the thread in question and waiting for them to
> > finish. So you define the hierarchy, there is the main thread taking
> > responsibility and subthreads that are managed by it.
> 
> This is more or less what I was describing with the quit flag and signal 
> handler being static members of the agent. I’m OK with that.
> 
> > 
> > > To me, quit_requested is the archetypical example of when a global 
> > > variable should be used. There is only one “quit”, and it’s for all 
> > > threads and objects in the program. How each one of them deals with it is 
> > > local, but the “we must quit” request is global.
> > 
> > As I explained. I'm not necessarily saying your approach is wrong, just 
> > that:
> > 
> > 1. I prefer to explicitely define a main thread that has the management 
> > responsibility.
> 
> As I see it, that’s how the code after refactoring works.
> 
> > 2. The static flag shared across classes simply seems to me a thing to
> > avoid, it breaks modularity and for example makes it a bit more fiddly
> > to use the modules in tests for example…
> 
> Except for something that is truly global.
> 
> I don’t mind making it a static member of the agent.
> 
> I do mind having multiple “quit” flags, or a pointer to the quit flag.

Sure, it's not a big issue either, let's do it your way.

> What happens if the signal handler is invoked before the quit flag pointer is 
> initialized, for example?

That's one of the corner cases I mentioned, you'd have to register the
signal handler only after you initialize the flag pointer. Can be a
disadvantage, I suppose.

> > 
> > > > 
> > > > > 
> > > > > > 
> > > > > > > +
> > > > > > > #endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> > > > > > > diff --git a/src/Makefile.am b/src/Makefile.am
> > > > > > > index 2507844..923a103 100644
> > > > > > > --- a/src/Makefile.am
> > > > > > > +++ b/src/Makefile.am
> > > > > > > @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \
> > > > > > >   mjpeg-fallback.hpp \
> > > > > > >   jpeg.cpp \
> > > > > > >   jpeg.hpp \
> > > > > > > + stream.cpp \
> > > > > > > + stream.hpp \
> > > > > > >   errors.cpp \
> > > > > > >   $(NULL)
> > > > > > > diff --git a/src/message.hpp b/src/message.hpp
> > > > > > > new file mode 100644
> > > > > > > index 0000000..28b3e28
> > > > > > > --- /dev/null
> > > > > > > +++ b/src/message.hpp
> > > > > > > @@ -0,0 +1,41 @@
> > > > > > > +/* Formatting messages
> > > > > > > + *
> > > > > > > + * \copyright
> > > > > > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > > > > > + */
> > > > > > > +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > > > > > +#define SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > > > > > +
> > > > > > > +#include <spice/stream-device.h>
> > > > > > > +
> > > > > > > +namespace spice
> > > > > > > +{
> > > > > > > +namespace streaming_agent
> > > > > > > +{
> > > > > > > +
> > > > > > > +template <typename Payload, typename Info, unsigned Type>
> > > > > > > +class Message
> > > > > > > +{
> > > > > > > +public:
> > > > > > > +    template <typename ...PayloadArgs>
> > > > > > > +    Message(PayloadArgs... payload)
> > > > > > > +        : hdr(StreamDevHeader {
> > > > > > > +              .protocol_version = STREAM_DEVICE_PROTOCOL,
> > > > > > > +              .padding = 0,     // Workaround GCC bug "sorry: 
> > > > > > > not implemented"
> > > > > > > +              .type = Type,
> > > > > > > +              .size = (uint32_t) Info::size(payload...)
> > > > > > > +          })
> > > > > > > +    { }
> > > > > > > +    void write_header(Stream &stream)
> > > > > > > +    {
> > > > > > > +        stream.write_all("header", &hdr, sizeof(hdr));
> > > > > > > +    }
> > > > > > > +
> > > > > > > +protected:
> > > > > > > +    StreamDevHeader hdr;
> > > > > > > +    typedef Payload payload_t;
> > > > > > > +};
> > > > > > > +
> > > > > > > +}} // namespace spice::streaming_agent
> > > > > > > +
> > > > > > > +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP
> > > > > > > diff --git a/src/spice-streaming-agent.cpp 
> > > > > > > b/src/spice-streaming-agent.cpp
> > > > > > > index 35e65bb..c401a34 100644
> > > > > > > --- a/src/spice-streaming-agent.cpp
> > > > > > > +++ b/src/spice-streaming-agent.cpp
> > > > > > > @@ -5,6 +5,8 @@
> > > > > > > */
> > > > > > > 
> > > > > > > #include "concrete-agent.hpp"
> > > > > > > +#include "stream.hpp"
> > > > > > > +#include "message.hpp"
> > > > > > > #include "hexdump.h"
> > > > > > > #include "mjpeg-fallback.hpp"
> > > > > > > 
> > > > > > > @@ -21,11 +23,9 @@
> > > > > > > #include <inttypes.h>
> > > > > > > #include <string.h>
> > > > > > > #include <getopt.h>
> > > > > > > -#include <unistd.h>
> > > > > > > #include <errno.h>
> > > > > > > -#include <fcntl.h>
> > > > > > > +#include <unistd.h>
> > > > > > > #include <sys/time.h>
> > > > > > > -#include <poll.h>
> > > > > > > #include <syslog.h>
> > > > > > > #include <signal.h>
> > > > > > > #include <exception>
> > > > > > > @@ -57,76 +57,6 @@ static uint64_t get_time(void)
> > > > > > > 
> > > > > > > }
> > > > > > > 
> > > > > > > -class Stream
> > > > > > > -{
> > > > > > > -    typedef std::set<SpiceVideoCodecType> codecs_t;
> > > > > > > -
> > > > > > > -public:
> > > > > > > -    Stream(const char *name)
> > > > > > > -        : codecs()
> > > > > > > -    {
> > > > > > > -        streamfd = open(name, O_RDWR);
> > > > > > > -        if (streamfd < 0) {
> > > > > > > -            throw IOError("failed to open streaming device", 
> > > > > > > errno);
> > > > > > > -        }
> > > > > > > -    }
> > > > > > > -    ~Stream()
> > > > > > > -    {
> > > > > > > -        close(streamfd);
> > > > > > > -    }
> > > > > > > -
> > > > > > > -    const codecs_t &client_codecs() { return codecs; }
> > > > > > > -    bool streaming_requested() { return is_streaming; }
> > > > > > > -
> > > > > > > -    template <typename Message, typename ...PayloadArgs>
> > > > > > > -    void send(PayloadArgs... payload)
> > > > > > > -    {
> > > > > > > -        Message message(payload...);
> > > > > > > -        std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > > > -        message.write_header(*this);
> > > > > > > -        message.write_message_body(*this, payload...);
> > > > > > > -    }
> > > > > > > -
> > > > > > > -    int read_command(bool blocking);
> > > > > > > -    void write_all(const char *operation, const void *buf, const 
> > > > > > > size_t len);
> > > > > > > -
> > > > > > > -private:
> > > > > > > -    int have_something_to_read(int timeout);
> > > > > > > -    void handle_stream_start_stop(uint32_t len);
> > > > > > > -    void handle_stream_capabilities(uint32_t len);
> > > > > > > -    void handle_stream_error(uint32_t len);
> > > > > > > -    void read_command_from_device(void);
> > > > > > > -
> > > > > > > -private:
> > > > > > > -    std::mutex mutex;
> > > > > > > -    codecs_t codecs;
> > > > > > > -    int streamfd = -1;
> > > > > > > -    bool is_streaming = false;
> > > > > > > -};
> > > > > > > -
> > > > > > > -template <typename Payload, typename Info, unsigned Type>
> > > > > > > -class Message
> > > > > > > -{
> > > > > > > -public:
> > > > > > > -    template <typename ...PayloadArgs>
> > > > > > > -    Message(PayloadArgs... payload)
> > > > > > > -        : hdr(StreamDevHeader {
> > > > > > > -              .protocol_version = STREAM_DEVICE_PROTOCOL,
> > > > > > > -              .padding = 0,     // Workaround GCC bug "sorry: 
> > > > > > > not implemented"
> > > > > > > -              .type = Type,
> > > > > > > -              .size = (uint32_t) Info::size(payload...)
> > > > > > > -          })
> > > > > > > -    { }
> > > > > > > -    void write_header(Stream &stream)
> > > > > > > -    {
> > > > > > > -        stream.write_all("header", &hdr, sizeof(hdr));
> > > > > > > -    }
> > > > > > > -
> > > > > > > -protected:
> > > > > > > -    StreamDevHeader hdr;
> > > > > > > -    typedef Payload payload_t;
> > > > > > > -};
> > > > > > > -
> > > > > > > class FormatMessage : public Message<StreamMsgFormat, 
> > > > > > > FormatMessage, STREAM_TYPE_FORMAT>
> > > > > > > {
> > > > > > > public:
> > > > > > > @@ -156,20 +86,6 @@ public:
> > > > > > >   }
> > > > > > > };
> > > > > > > 
> > > > > > > -class CapabilitiesMessage : public Message<StreamMsgData, 
> > > > > > > CapabilitiesMessage, STREAM_TYPE_CAPABILITIES>
> > > > > > > -{
> > > > > > > -public:
> > > > > > > -    CapabilitiesMessage() : Message() {}
> > > > > > > -    static size_t size()
> > > > > > > -    {
> > > > > > > -        return sizeof(payload_t);
> > > > > > > -    }
> > > > > > > -    void write_message_body(Stream &stream)
> > > > > > > -    {
> > > > > > > -        /* No body for capabilities message */
> > > > > > > -    }
> > > > > > > -};
> > > > > > > -
> > > > > > > class X11CursorMessage : public Message<StreamMsgCursorSet, 
> > > > > > > X11CursorMessage, STREAM_TYPE_CURSOR_SET>
> > > > > > > {
> > > > > > > public:
> > > > > > > @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream 
> > > > > > > &stream)
> > > > > > > 
> > > > > > > }} // namespace spice::streaming_agent
> > > > > > > 
> > > > > > > -static bool quit_requested = false;
> > > > > > > -
> > > > > > > -int Stream::have_something_to_read(int timeout)
> > > > > > > -{
> > > > > > > -    struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > > > > > -
> > > > > > > -    if (poll(&pollfd, 1, timeout) < 0) {
> > > > > > > -        syslog(LOG_ERR, "poll FAILED\n");
> > > > > > > -        return -1;
> > > > > > > -    }
> > > > > > > -
> > > > > > > -    if (pollfd.revents == POLLIN) {
> > > > > > > -        return 1;
> > > > > > > -    }
> > > > > > > -
> > > > > > > -    return 0;
> > > > > > > -}
> > > > > > > -
> > > > > > > -void Stream::handle_stream_start_stop(uint32_t len)
> > > > > > > -{
> > > > > > > -    uint8_t msg[256];
> > > > > > > -
> > > > > > > -    if (len >= sizeof(msg)) {
> > > > > > > -        throw MessageDataError("message is too long", len, 
> > > > > > > sizeof(msg));
> > > > > > > -    }
> > > > > > > -    int n = read(streamfd, &msg, len);
> > > > > > > -    if (n != (int) len) {
> > > > > > > -        throw MessageDataError("read start/stop command from 
> > > > > > > device failed", n, len, errno);
> > > > > > > -    }
> > > > > > > -    is_streaming = (msg[0] != 0); /* num_codecs */
> > > > > > > -    syslog(LOG_INFO, "GOT START_STOP message -- request to %s 
> > > > > > > streaming\n",
> > > > > > > -           is_streaming ? "START" : "STOP");
> > > > > > > -    codecs.clear();
> > > > > > > -    for (int i = 1; i <= msg[0]; ++i) {
> > > > > > > -        codecs.insert((SpiceVideoCodecType) msg[i]);
> > > > > > > -    }
> > > > > > > -}
> > > > > > > -
> > > > > > > -void Stream::handle_stream_capabilities(uint32_t len)
> > > > > > > -{
> > > > > > > -    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> > > > > > > -
> > > > > > > -    if (len > sizeof(caps)) {
> > > > > > > -        throw MessageDataError("capability message too long", 
> > > > > > > len, sizeof(caps));
> > > > > > > -    }
> > > > > > > -    int n = read(streamfd, caps, len);
> > > > > > > -    if (n != (int) len) {
> > > > > > > -        throw MessageDataError("read capabilities from device 
> > > > > > > failed", n, len, errno);
> > > > > > > -    }
> > > > > > > -
> > > > > > > -    // we currently do not support extensions so just reply so
> > > > > > > -    send<CapabilitiesMessage>();
> > > > > > > -}
> > > > > > > -
> > > > > > > -void Stream::handle_stream_error(uint32_t len)
> > > > > > > -{
> > > > > > > -    // TODO read message and use it
> > > > > > > -    throw ProtocolError("got an error message from server");
> > > > > > > -}
> > > > > > > -
> > > > > > > -void Stream::read_command_from_device()
> > > > > > > -{
> > > > > > > -    StreamDevHeader hdr;
> > > > > > > -    int n;
> > > > > > > -
> > > > > > > -    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > > > -    n = read(streamfd, &hdr, sizeof(hdr));
> > > > > > > -    if (n != sizeof(hdr)) {
> > > > > > > -        throw MessageDataError("read command from device 
> > > > > > > failed", n, sizeof(hdr), errno);
> > > > > > > -    }
> > > > > > > -    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > > > > > -        throw MessageDataError("bad protocol version", 
> > > > > > > hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> > > > > > > -    }
> > > > > > > -
> > > > > > > -    switch (hdr.type) {
> > > > > > > -    case STREAM_TYPE_CAPABILITIES:
> > > > > > > -        return handle_stream_capabilities(hdr.size);
> > > > > > > -    case STREAM_TYPE_NOTIFY_ERROR:
> > > > > > > -        return handle_stream_error(hdr.size);
> > > > > > > -    case STREAM_TYPE_START_STOP:
> > > > > > > -        return handle_stream_start_stop(hdr.size);
> > > > > > > -    }
> > > > > > > -    throw MessageDataError("unknown message type", hdr.type, 0);
> > > > > > > -}
> > > > > > > -
> > > > > > > -int Stream::read_command(bool blocking)
> > > > > > > -{
> > > > > > > -    int timeout = blocking?-1:0;
> > > > > > > -    while (!quit_requested) {
> > > > > > > -        if (!have_something_to_read(timeout)) {
> > > > > > > -            if (!blocking) {
> > > > > > > -                return 0;
> > > > > > > -            }
> > > > > > > -            sleep(1);
> > > > > > > -            continue;
> > > > > > > -        }
> > > > > > > -        read_command_from_device();
> > > > > > > -        break;
> > > > > > > -    }
> > > > > > > -
> > > > > > > -    return 1;
> > > > > > > -}
> > > > > > > -
> > > > > > > -void Stream::write_all(const char *operation, const void *buf, 
> > > > > > > const size_t len)
> > > > > > > -{
> > > > > > > -    size_t written = 0;
> > > > > > > -    while (written < len) {
> > > > > > > -        int l = write(streamfd, (const char *) buf + written, 
> > > > > > > len - written);
> > > > > > > -        if (l < 0) {
> > > > > > > -            if (errno == EINTR) {
> > > > > > > -                continue;
> > > > > > > -            }
> > > > > > > -            throw WriteError("write failed", operation, 
> > > > > > > errno).syslog();
> > > > > > > -        }
> > > > > > > -        written += l;
> > > > > > > -    }
> > > > > > > -    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", 
> > > > > > > written);
> > > > > > > -}
> > > > > > > +bool quit_requested = false;
> > > > > > > 
> > > > > > > static void handle_interrupt(int intr)
> > > > > > > {
> > > > > > > diff --git a/src/stream.cpp b/src/stream.cpp
> > > > > > > new file mode 100644
> > > > > > > index 0000000..f756097
> > > > > > > --- /dev/null
> > > > > > > +++ b/src/stream.cpp
> > > > > > > @@ -0,0 +1,172 @@
> > > > > > > +/* Encapsulation of the stream used to communicate between agent 
> > > > > > > and server
> > > > > > > + *
> > > > > > > + * \copyright
> > > > > > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > > > > > + */
> > > > > > > +
> > > > > > > +#include "stream.hpp"
> > > > > > > +#include "message.hpp"
> > > > > > > +
> > > > > > > +#include <spice/stream-device.h>
> > > > > > > +
> > > > > > > +#include <spice-streaming-agent/errors.hpp>
> > > > > > > +
> > > > > > > +#include <sys/types.h>
> > > > > > > +#include <sys/stat.h>
> > > > > > > +#include <fcntl.h>
> > > > > > > +#include <poll.h>
> > > > > > > +#include <syslog.h>
> > > > > > > +#include <unistd.h>
> > > > > > > +
> > > > > > > +namespace spice
> > > > > > > +{
> > > > > > > +namespace streaming_agent
> > > > > > > +{
> > > > > > > +
> > > > > > > +class CapabilitiesMessage : public Message<StreamMsgData, 
> > > > > > > CapabilitiesMessage,
> > > > > > > +                                           
> > > > > > > STREAM_TYPE_CAPABILITIES>
> > > > > > > +{
> > > > > > > +public:
> > > > > > > +    CapabilitiesMessage() : Message() {}
> > > > > > > +    static size_t size()
> > > > > > > +    {
> > > > > > > +        return sizeof(payload_t);
> > > > > > > +    }
> > > > > > > +    void write_message_body(Stream &stream)
> > > > > > > +    {
> > > > > > > +        /* No body for capabilities message */
> > > > > > > +    }
> > > > > > > +};
> > > > > > 
> > > > > > Not sure I like scattering the messages across source files that 
> > > > > > happen
> > > > > > to use them, though I suppose you did it because each message (like 
> > > > > > the
> > > > > > X11Cursor) may require different header files included? Perhaps it 
> > > > > > is
> > > > > > the way to go…
> > > > > 
> > > > > No, it’s really to de-couple things, a good way to check if 
> > > > > encapsulation was correct.
> > > > > 
> > > > > 
> > > > > > 
> > > > > > > +
> > > > > > > +Stream::Stream(const char *name)
> > > > > > > +    : codecs()
> > > > > > > +{
> > > > > > > +    streamfd = open(name, O_RDWR);
> > > > > > > +    if (streamfd < 0) {
> > > > > > > +        throw IOError("failed to open streaming device", errno);
> > > > > > > +    }
> > > > > > > +}
> > > > > > > +
> > > > > > > +Stream::~Stream()
> > > > > > > +{
> > > > > > > +    close(streamfd);
> > > > > > > +}
> > > > > > > +
> > > > > > > +int Stream::have_something_to_read(int timeout)
> > > > > > > +{
> > > > > > > +    struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > > > > > +
> > > > > > > +    if (poll(&pollfd, 1, timeout) < 0) {
> > > > > > > +        syslog(LOG_ERR, "poll FAILED\n");
> > > > > > > +        return -1;
> > > > > > > +    }
> > > > > > > +
> > > > > > > +    if (pollfd.revents == POLLIN) {
> > > > > > > +        return 1;
> > > > > > > +    }
> > > > > > > +
> > > > > > > +    return 0;
> > > > > > > +}
> > > > > > > +
> > > > > > > +void Stream::handle_stream_start_stop(uint32_t len)
> > > > > > > +{
> > > > > > > +    uint8_t msg[256];
> > > > > > > +
> > > > > > > +    if (len >= sizeof(msg)) {
> > > > > > > +        throw MessageDataError("message is too long", len, 
> > > > > > > sizeof(msg));
> > > > > > > +    }
> > > > > > > +    int n = read(streamfd, &msg, len);
> > > > > > > +    if (n != (int) len) {
> > > > > > > +        throw MessageDataError("read start/stop command from 
> > > > > > > device failed", n, len, errno);
> > > > > > > +    }
> > > > > > > +    is_streaming = (msg[0] != 0); /* num_codecs */
> > > > > > > +    syslog(LOG_INFO, "GOT START_STOP message -- request to %s 
> > > > > > > streaming\n",
> > > > > > > +           is_streaming ? "START" : "STOP");
> > > > > > > +    codecs.clear();
> > > > > > > +    for (int i = 1; i <= msg[0]; ++i) {
> > > > > > > +        codecs.insert((SpiceVideoCodecType) msg[i]);
> > > > > > > +    }
> > > > > > > +}
> > > > > > > +
> > > > > > > +void Stream::handle_stream_capabilities(uint32_t len)
> > > > > > > +{
> > > > > > > +    uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
> > > > > > > +
> > > > > > > +    if (len > sizeof(caps)) {
> > > > > > > +        throw MessageDataError("capability message too long", 
> > > > > > > len, sizeof(caps));
> > > > > > > +    }
> > > > > > > +    int n = read(streamfd, caps, len);
> > > > > > > +    if (n != (int) len) {
> > > > > > > +        throw MessageDataError("read capabilities from device 
> > > > > > > failed", n, len, errno);
> > > > > > > +    }
> > > > > > > +
> > > > > > > +    // we currently do not support extensions so just reply so
> > > > > > > +    send<CapabilitiesMessage>();
> > > > > > > +}
> > > > > > > +
> > > > > > > +void Stream::handle_stream_error(uint32_t len)
> > > > > > > +{
> > > > > > > +    // TODO read message and use it
> > > > > > > +    throw ProtocolError("got an error message from server");
> > > > > > > +}
> > > > > > > +
> > > > > > > +void Stream::read_command_from_device()
> > > > > > > +{
> > > > > > > +    StreamDevHeader hdr;
> > > > > > > +    int n;
> > > > > > > +
> > > > > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > > > +    n = read(streamfd, &hdr, sizeof(hdr));
> > > > > > > +    if (n != sizeof(hdr)) {
> > > > > > > +        throw MessageDataError("read command from device 
> > > > > > > failed", n, sizeof(hdr), errno);
> > > > > > > +    }
> > > > > > > +    if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) {
> > > > > > > +        throw MessageDataError("bad protocol version", 
> > > > > > > hdr.protocol_version, STREAM_DEVICE_PROTOCOL);
> > > > > > > +    }
> > > > > > > +
> > > > > > > +    switch (hdr.type) {
> > > > > > > +    case STREAM_TYPE_CAPABILITIES:
> > > > > > > +        return handle_stream_capabilities(hdr.size);
> > > > > > > +    case STREAM_TYPE_NOTIFY_ERROR:
> > > > > > > +        return handle_stream_error(hdr.size);
> > > > > > > +    case STREAM_TYPE_START_STOP:
> > > > > > > +        return handle_stream_start_stop(hdr.size);
> > > > > > > +    }
> > > > > > > +    throw MessageDataError("unknown message type", hdr.type, 0);
> > > > > > > +}
> > > > > > > +
> > > > > > > +int Stream::read_command(bool blocking)
> > > > > > > +{
> > > > > > > +    int timeout = blocking?-1:0;
> > > > > > > +    while (!quit_requested) {
> > > > > > > +        if (!have_something_to_read(timeout)) {
> > > > > > > +            if (!blocking) {
> > > > > > > +                return 0;
> > > > > > > +            }
> > > > > > > +            sleep(1);
> > > > > > > +            continue;
> > > > > > > +        }
> > > > > > > +        read_command_from_device();
> > > > > > > +        break;
> > > > > > > +    }
> > > > > > > +
> > > > > > > +    return 1;
> > > > > > > +}
> > > > > > > +
> > > > > > > +void Stream::write_all(const char *operation, const void *buf, 
> > > > > > > const size_t len)
> > > > > > > +{
> > > > > > > +    size_t written = 0;
> > > > > > > +    while (written < len) {
> > > > > > > +        int l = write(streamfd, (const char *) buf + written, 
> > > > > > > len - written);
> > > > > > > +        if (l < 0) {
> > > > > > > +            if (errno == EINTR) {
> > > > > > > +                continue;
> > > > > > > +            }
> > > > > > > +            throw WriteError("write failed", operation, 
> > > > > > > errno).syslog();
> > > > > > > +        }
> > > > > > > +        written += l;
> > > > > > > +    }
> > > > > > > +    syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", 
> > > > > > > written);
> > > > > > > +}
> > > > > > > +
> > > > > > > +}} // namespace spice::streaming_agent
> > > > > > > diff --git a/src/stream.hpp b/src/stream.hpp
> > > > > > > new file mode 100644
> > > > > > > index 0000000..b689f36
> > > > > > > --- /dev/null
> > > > > > > +++ b/src/stream.hpp
> > > > > > > @@ -0,0 +1,55 @@
> > > > > > > +/* Encapsulation of the stream used to communicate between agent 
> > > > > > > and server
> > > > > > > + *
> > > > > > > + * \copyright
> > > > > > > + * Copyright 2018 Red Hat Inc. All rights reserved.
> > > > > > > + */
> > > > > > > +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP
> > > > > > > +#define SPICE_STREAMING_AGENT_STREAM_HPP
> > > > > > > +
> > > > > > > +#include <spice/enums.h>
> > > > > > > +#include <set>
> > > > > > > +#include <mutex>
> > > > > > > +
> > > > > > > +namespace spice {
> > > > > > > +namespace streaming_agent {
> > > > > > > +
> > > > > > > +class Stream
> > > > > > > +{
> > > > > > > +    typedef std::set<SpiceVideoCodecType> codecs_t;
> > > > > > > +
> > > > > > > +public:
> > > > > > > +    Stream(const char *name);
> > > > > > > +    ~Stream();
> > > > > > > +
> > > > > > > +    const codecs_t &client_codecs() { return codecs; }
> > > > > > > +    bool streaming_requested() { return is_streaming; }
> > > > > > > +
> > > > > > > +    template <typename Message, typename ...PayloadArgs>
> > > > > > > +    void send(PayloadArgs... payload)
> > > > > > > +    {
> > > > > > > +        Message message(payload...);
> > > > > > > +        std::lock_guard<std::mutex> stream_guard(mutex);
> > > > > > > +        message.write_header(*this);
> > > > > > > +        message.write_message_body(*this, payload...);
> > > > > > > +    }
> > > > > > > +
> > > > > > > +    int read_command(bool blocking);
> > > > > > > +    void write_all(const char *operation, const void *buf, const 
> > > > > > > size_t len);
> > > > > > > +
> > > > > > > +private:
> > > > > > > +    int have_something_to_read(int timeout);
> > > > > > > +    void handle_stream_start_stop(uint32_t len);
> > > > > > > +    void handle_stream_capabilities(uint32_t len);
> > > > > > > +    void handle_stream_error(uint32_t len);
> > > > > > > +    void read_command_from_device(void);
> > > > > > > +
> > > > > > > +private:
> > > > > > > +    std::mutex mutex;
> > > > > > > +    codecs_t codecs;
> > > > > > > +    int streamfd = -1;
> > > > > > > +    bool is_streaming = false;
> > > > > > > +};
> > > > > > > +
> > > > > > > +}} // namespace spice::streaming_agent
> > > > > > > +
> > > > > > > +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP
> 
> 
_______________________________________________
Spice-devel mailing list
Spice-devel@lists.freedesktop.org
https://lists.freedesktop.org/mailman/listinfo/spice-devel

Reply via email to