On Fri, 2018-03-02 at 19:25 +0100, Christophe de Dinechin wrote: > > On 2 Mar 2018, at 16:46, Lukáš Hrázký <lhra...@redhat.com> wrote: > > > > On Fri, 2018-03-02 at 14:35 +0100, Christophe de Dinechin wrote: > > > > On 2 Mar 2018, at 14:07, Lukáš Hrázký <lhra...@redhat.com> wrote: > > > > > > > > On Thu, 2018-03-01 at 17:57 +0100, Christophe de Dinechin wrote: > > > > > > On 1 Mar 2018, at 16:11, Lukáš Hrázký <lhra...@redhat.com> wrote: > > > > > > > > > > > > On Wed, 2018-02-28 at 16:43 +0100, Christophe de Dinechin wrote: > > > > > > > From: Christophe de Dinechin <dinec...@redhat.com> > > > > > > > > > > > > > > Doing this change will make it possible to move the capture loop > > > > > > > to the > > > > > > > concrete-agent.cpp file. > > > > > > > > > > > > > > Signed-off-by: Christophe de Dinechin <dinec...@redhat.com> > > > > > > > --- > > > > > > > include/spice-streaming-agent/errors.hpp | 2 + > > > > > > > src/Makefile.am | 2 + > > > > > > > src/message.hpp | 41 ++++++ > > > > > > > src/spice-streaming-agent.cpp | 209 > > > > > > > +------------------------------ > > > > > > > src/stream.cpp | 172 > > > > > > > +++++++++++++++++++++++++ > > > > > > > src/stream.hpp | 55 ++++++++ > > > > > > > 6 files changed, 276 insertions(+), 205 deletions(-) > > > > > > > create mode 100644 src/message.hpp > > > > > > > create mode 100644 src/stream.cpp > > > > > > > create mode 100644 src/stream.hpp > > > > > > > > > > > > > > diff --git a/include/spice-streaming-agent/errors.hpp > > > > > > > b/include/spice-streaming-agent/errors.hpp > > > > > > > index 870a0fd..62ae010 100644 > > > > > > > --- a/include/spice-streaming-agent/errors.hpp > > > > > > > +++ b/include/spice-streaming-agent/errors.hpp > > > > > > > @@ -90,4 +90,6 @@ protected: > > > > > > > > > > > > > > }} // namespace spice::streaming_agent > > > > > > > > > > > > > > +extern bool quit_requested; > > > > > > > > > > > > Putting quit_requested into errors.hpp? Why? > > > > > > > > > > Because errors.hpp deals with error conditions. You need to quit > > > > > other threads on signals or exceptions. See > > > > > https://gitlab.com/c3d/spice-streaming-agent/commit/07b0e0ea9317fab3867fb29d4367be8d4ad8ba98. > > > > > > > > I don't think the flag belongs to the errors header at all, let alone a > > > > public one. It's a generic control flow mechanism to signal the > > > > termination of the program. The only relation to errors is that in case > > > > of errors you want to (usually) also quit. > > > > > > Well, ‘quit_requested’ is set for all “final" errors, whether we detect > > > them using exceptions or signals. > > > > But a termination signal is not an error, it is the natural way to end > > the program. Actually, exceptions thrown inside the main program loop > > are another, second way to exit the loop besides setting the quit flag. > > The termination signal in itself is not an error. But it leads to either an > error in one of the system calls, or a graceful interruption if we are > elsewhere, see below.
Ok, but I still don't think this warrants bundling the quit flag with the exceptions... > > > > From another point of view, you include errors.h in each place where > > you are either throwing or catching exceptions. But the quit flag is > > only needed in the main loop and in the signal handler. > > Actually, for correct operation, it’s also needed in read_all and poll, as > these system calls may be interrupted. The current code is broken, I have > another fix but that was not submitted yet. Something like: > https://github.com/c3d/spice-streaming-agent/commit/de11d77e9b6fcb92f29eacc7da178624783aea6b > (similar tests are needed in read_all and write_all, I believe). This is > what fixes the “Control-C” problem. > > Now, I see these as “OS errors” that are not reported through exception but > using errno. The proper handling of this kind of OS errors requires a check > of quit_requested. I see. In one of my experiments, I managed to factor out the quit_requested flag out of the Stream functions. But that did not take into acount this addition, so it may not be possible. > Obviously, we can put this flag somewhere else if that really annoys you, but > frankly, I think that errors.hpp is as good a place as any. I would be glad if we did :) > > > > > > > > > > Therefore the quit flag should not be coupled with errors and instead > > > > used in the main control flow spot where global error handling is > > > > taking place (and then in the signal handler). As a static variable it > > > > should also be proliferating through the program as little as possible. > > > > > > > > I've actually had the following idea for the quit flag, which I think > > > > promotes the locality of the flag in the class design: > > > > > > > > > > > > // file agent.{hpp,cpp} > > > > class Agent { > > > > public: > > > > bool& quit_flag() { > > > > return quit_requested; > > > > } > > > > > > > > void do_capture() { > > > > while(!quit_requested) { > > > > // ... > > > > } > > > > } > > > > private: > > > > bool quit_requested = false; > > > > }; > > > > > > > > > > > > // file main.cpp > > > > static bool* quit_requested = nullptr; > > > > > > > > void handle_sigterm() { > > > > *quit_requested = true; > > > > } > > > > > > > > int main() { > > > > Agent agent; > > > > quit_requested = &agent.quit_flag(); > > > > ... > > > > } > > > > > > > > > > > > Some corner case handling (the quit_reqested pointer not yet set, etc.) > > > > was left out for clarity. > > > > > > I understand what you are trying to do, but you replace one global > > > variable with one (and several if I follow the logic below), so how is > > > helping with the proliferation? > > > > Only this one global variable that is in the example. And the only > > reason it is global is because of the signal handler, there is no other > > way for it work. Therefore, I only make it "global locally" (excuse the > > oxymoron :)) for the signal handler and limit what can access it as > > much as I can. > > > > (And it's just a pointer, the real flag is local to the loop which uses > > it) > > Adding one level of indirection here does not look like an improvement to me. A level of indirection is never an improvement in itself :) the improvement should have been no global variable shared across source modules. > If you want some kind of encapsulation, why not make the signal handler and > the quit flag static members of the agent instead? Would that work for you? > Something like this: > https://github.com/c3d/spice-streaming-agent/commit/077ad90ad2f923b90546a3be99988ddf45746ea7 > ? Yes, if it has to be a global quit flag, something like this is fine by me. > > > > > > > > > > This keeps the flag local to the loop in which it is relevant and the > > > > static variable local to the main.cpp file. Thus it increases > > > > modularity (which arguably we do not need that much here). > > > > > > > > I would also use a different quit flag for the cursor thread (again, > > > > local to the X11CursorUpdater) and take care of it in main() after the > > > > Agent::do_capture() loop quits. > > > > > > What benefits do you see in having multiple flags? Are there signals > > > where we can quit the agent without interrupting the cursor thread or > > > other activities? > > > > No, I don't think there are. The reason is different. To me one global > > "quit all" flag seems like a not well formed hierarchy of execution, > > for one, it could introduce race conditions, if we generalize and say > > we have several losely dependant threads/processes and you signal them > > all to quit, the order they do so is arbitrary. Of course you can > > introduce mechanisms like join() etc. to synchronize. > > > We have such issues presently with the cursor thread. This is fixed in my > series, it now exits cleanly in all test cases I threw at it instead of > aborting or terminating. > > > > > But what I would consider a better design would be one main loop in the > > main thread, that also reacts to the signals. If the main loop exits, > > then it would tear down the other threads signalling them in whatever > > way is natural to the thread in question and waiting for them to > > finish. So you define the hierarchy, there is the main thread taking > > responsibility and subthreads that are managed by it. > > This is more or less what I was describing with the quit flag and signal > handler being static members of the agent. I’m OK with that. > > > > > > To me, quit_requested is the archetypical example of when a global > > > variable should be used. There is only one “quit”, and it’s for all > > > threads and objects in the program. How each one of them deals with it is > > > local, but the “we must quit” request is global. > > > > As I explained. I'm not necessarily saying your approach is wrong, just > > that: > > > > 1. I prefer to explicitely define a main thread that has the management > > responsibility. > > As I see it, that’s how the code after refactoring works. > > > 2. The static flag shared across classes simply seems to me a thing to > > avoid, it breaks modularity and for example makes it a bit more fiddly > > to use the modules in tests for example… > > Except for something that is truly global. > > I don’t mind making it a static member of the agent. > > I do mind having multiple “quit” flags, or a pointer to the quit flag. Sure, it's not a big issue either, let's do it your way. > What happens if the signal handler is invoked before the quit flag pointer is > initialized, for example? That's one of the corner cases I mentioned, you'd have to register the signal handler only after you initialize the flag pointer. Can be a disadvantage, I suppose. > > > > > > > > > > > > > > > > > > > > > > > > + > > > > > > > #endif // SPICE_STREAMING_AGENT_ERRORS_HPP > > > > > > > diff --git a/src/Makefile.am b/src/Makefile.am > > > > > > > index 2507844..923a103 100644 > > > > > > > --- a/src/Makefile.am > > > > > > > +++ b/src/Makefile.am > > > > > > > @@ -55,5 +55,7 @@ spice_streaming_agent_SOURCES = \ > > > > > > > mjpeg-fallback.hpp \ > > > > > > > jpeg.cpp \ > > > > > > > jpeg.hpp \ > > > > > > > + stream.cpp \ > > > > > > > + stream.hpp \ > > > > > > > errors.cpp \ > > > > > > > $(NULL) > > > > > > > diff --git a/src/message.hpp b/src/message.hpp > > > > > > > new file mode 100644 > > > > > > > index 0000000..28b3e28 > > > > > > > --- /dev/null > > > > > > > +++ b/src/message.hpp > > > > > > > @@ -0,0 +1,41 @@ > > > > > > > +/* Formatting messages > > > > > > > + * > > > > > > > + * \copyright > > > > > > > + * Copyright 2018 Red Hat Inc. All rights reserved. > > > > > > > + */ > > > > > > > +#ifndef SPICE_STREAMING_AGENT_MESSAGE_HPP > > > > > > > +#define SPICE_STREAMING_AGENT_MESSAGE_HPP > > > > > > > + > > > > > > > +#include <spice/stream-device.h> > > > > > > > + > > > > > > > +namespace spice > > > > > > > +{ > > > > > > > +namespace streaming_agent > > > > > > > +{ > > > > > > > + > > > > > > > +template <typename Payload, typename Info, unsigned Type> > > > > > > > +class Message > > > > > > > +{ > > > > > > > +public: > > > > > > > + template <typename ...PayloadArgs> > > > > > > > + Message(PayloadArgs... payload) > > > > > > > + : hdr(StreamDevHeader { > > > > > > > + .protocol_version = STREAM_DEVICE_PROTOCOL, > > > > > > > + .padding = 0, // Workaround GCC bug "sorry: > > > > > > > not implemented" > > > > > > > + .type = Type, > > > > > > > + .size = (uint32_t) Info::size(payload...) > > > > > > > + }) > > > > > > > + { } > > > > > > > + void write_header(Stream &stream) > > > > > > > + { > > > > > > > + stream.write_all("header", &hdr, sizeof(hdr)); > > > > > > > + } > > > > > > > + > > > > > > > +protected: > > > > > > > + StreamDevHeader hdr; > > > > > > > + typedef Payload payload_t; > > > > > > > +}; > > > > > > > + > > > > > > > +}} // namespace spice::streaming_agent > > > > > > > + > > > > > > > +#endif // SPICE_STREAMING_AGENT_MESSAGE_HPP > > > > > > > diff --git a/src/spice-streaming-agent.cpp > > > > > > > b/src/spice-streaming-agent.cpp > > > > > > > index 35e65bb..c401a34 100644 > > > > > > > --- a/src/spice-streaming-agent.cpp > > > > > > > +++ b/src/spice-streaming-agent.cpp > > > > > > > @@ -5,6 +5,8 @@ > > > > > > > */ > > > > > > > > > > > > > > #include "concrete-agent.hpp" > > > > > > > +#include "stream.hpp" > > > > > > > +#include "message.hpp" > > > > > > > #include "hexdump.h" > > > > > > > #include "mjpeg-fallback.hpp" > > > > > > > > > > > > > > @@ -21,11 +23,9 @@ > > > > > > > #include <inttypes.h> > > > > > > > #include <string.h> > > > > > > > #include <getopt.h> > > > > > > > -#include <unistd.h> > > > > > > > #include <errno.h> > > > > > > > -#include <fcntl.h> > > > > > > > +#include <unistd.h> > > > > > > > #include <sys/time.h> > > > > > > > -#include <poll.h> > > > > > > > #include <syslog.h> > > > > > > > #include <signal.h> > > > > > > > #include <exception> > > > > > > > @@ -57,76 +57,6 @@ static uint64_t get_time(void) > > > > > > > > > > > > > > } > > > > > > > > > > > > > > -class Stream > > > > > > > -{ > > > > > > > - typedef std::set<SpiceVideoCodecType> codecs_t; > > > > > > > - > > > > > > > -public: > > > > > > > - Stream(const char *name) > > > > > > > - : codecs() > > > > > > > - { > > > > > > > - streamfd = open(name, O_RDWR); > > > > > > > - if (streamfd < 0) { > > > > > > > - throw IOError("failed to open streaming device", > > > > > > > errno); > > > > > > > - } > > > > > > > - } > > > > > > > - ~Stream() > > > > > > > - { > > > > > > > - close(streamfd); > > > > > > > - } > > > > > > > - > > > > > > > - const codecs_t &client_codecs() { return codecs; } > > > > > > > - bool streaming_requested() { return is_streaming; } > > > > > > > - > > > > > > > - template <typename Message, typename ...PayloadArgs> > > > > > > > - void send(PayloadArgs... payload) > > > > > > > - { > > > > > > > - Message message(payload...); > > > > > > > - std::lock_guard<std::mutex> stream_guard(mutex); > > > > > > > - message.write_header(*this); > > > > > > > - message.write_message_body(*this, payload...); > > > > > > > - } > > > > > > > - > > > > > > > - int read_command(bool blocking); > > > > > > > - void write_all(const char *operation, const void *buf, const > > > > > > > size_t len); > > > > > > > - > > > > > > > -private: > > > > > > > - int have_something_to_read(int timeout); > > > > > > > - void handle_stream_start_stop(uint32_t len); > > > > > > > - void handle_stream_capabilities(uint32_t len); > > > > > > > - void handle_stream_error(uint32_t len); > > > > > > > - void read_command_from_device(void); > > > > > > > - > > > > > > > -private: > > > > > > > - std::mutex mutex; > > > > > > > - codecs_t codecs; > > > > > > > - int streamfd = -1; > > > > > > > - bool is_streaming = false; > > > > > > > -}; > > > > > > > - > > > > > > > -template <typename Payload, typename Info, unsigned Type> > > > > > > > -class Message > > > > > > > -{ > > > > > > > -public: > > > > > > > - template <typename ...PayloadArgs> > > > > > > > - Message(PayloadArgs... payload) > > > > > > > - : hdr(StreamDevHeader { > > > > > > > - .protocol_version = STREAM_DEVICE_PROTOCOL, > > > > > > > - .padding = 0, // Workaround GCC bug "sorry: > > > > > > > not implemented" > > > > > > > - .type = Type, > > > > > > > - .size = (uint32_t) Info::size(payload...) > > > > > > > - }) > > > > > > > - { } > > > > > > > - void write_header(Stream &stream) > > > > > > > - { > > > > > > > - stream.write_all("header", &hdr, sizeof(hdr)); > > > > > > > - } > > > > > > > - > > > > > > > -protected: > > > > > > > - StreamDevHeader hdr; > > > > > > > - typedef Payload payload_t; > > > > > > > -}; > > > > > > > - > > > > > > > class FormatMessage : public Message<StreamMsgFormat, > > > > > > > FormatMessage, STREAM_TYPE_FORMAT> > > > > > > > { > > > > > > > public: > > > > > > > @@ -156,20 +86,6 @@ public: > > > > > > > } > > > > > > > }; > > > > > > > > > > > > > > -class CapabilitiesMessage : public Message<StreamMsgData, > > > > > > > CapabilitiesMessage, STREAM_TYPE_CAPABILITIES> > > > > > > > -{ > > > > > > > -public: > > > > > > > - CapabilitiesMessage() : Message() {} > > > > > > > - static size_t size() > > > > > > > - { > > > > > > > - return sizeof(payload_t); > > > > > > > - } > > > > > > > - void write_message_body(Stream &stream) > > > > > > > - { > > > > > > > - /* No body for capabilities message */ > > > > > > > - } > > > > > > > -}; > > > > > > > - > > > > > > > class X11CursorMessage : public Message<StreamMsgCursorSet, > > > > > > > X11CursorMessage, STREAM_TYPE_CURSOR_SET> > > > > > > > { > > > > > > > public: > > > > > > > @@ -329,124 +245,7 @@ X11CursorThread::X11CursorThread(Stream > > > > > > > &stream) > > > > > > > > > > > > > > }} // namespace spice::streaming_agent > > > > > > > > > > > > > > -static bool quit_requested = false; > > > > > > > - > > > > > > > -int Stream::have_something_to_read(int timeout) > > > > > > > -{ > > > > > > > - struct pollfd pollfd = {streamfd, POLLIN, 0}; > > > > > > > - > > > > > > > - if (poll(&pollfd, 1, timeout) < 0) { > > > > > > > - syslog(LOG_ERR, "poll FAILED\n"); > > > > > > > - return -1; > > > > > > > - } > > > > > > > - > > > > > > > - if (pollfd.revents == POLLIN) { > > > > > > > - return 1; > > > > > > > - } > > > > > > > - > > > > > > > - return 0; > > > > > > > -} > > > > > > > - > > > > > > > -void Stream::handle_stream_start_stop(uint32_t len) > > > > > > > -{ > > > > > > > - uint8_t msg[256]; > > > > > > > - > > > > > > > - if (len >= sizeof(msg)) { > > > > > > > - throw MessageDataError("message is too long", len, > > > > > > > sizeof(msg)); > > > > > > > - } > > > > > > > - int n = read(streamfd, &msg, len); > > > > > > > - if (n != (int) len) { > > > > > > > - throw MessageDataError("read start/stop command from > > > > > > > device failed", n, len, errno); > > > > > > > - } > > > > > > > - is_streaming = (msg[0] != 0); /* num_codecs */ > > > > > > > - syslog(LOG_INFO, "GOT START_STOP message -- request to %s > > > > > > > streaming\n", > > > > > > > - is_streaming ? "START" : "STOP"); > > > > > > > - codecs.clear(); > > > > > > > - for (int i = 1; i <= msg[0]; ++i) { > > > > > > > - codecs.insert((SpiceVideoCodecType) msg[i]); > > > > > > > - } > > > > > > > -} > > > > > > > - > > > > > > > -void Stream::handle_stream_capabilities(uint32_t len) > > > > > > > -{ > > > > > > > - uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES]; > > > > > > > - > > > > > > > - if (len > sizeof(caps)) { > > > > > > > - throw MessageDataError("capability message too long", > > > > > > > len, sizeof(caps)); > > > > > > > - } > > > > > > > - int n = read(streamfd, caps, len); > > > > > > > - if (n != (int) len) { > > > > > > > - throw MessageDataError("read capabilities from device > > > > > > > failed", n, len, errno); > > > > > > > - } > > > > > > > - > > > > > > > - // we currently do not support extensions so just reply so > > > > > > > - send<CapabilitiesMessage>(); > > > > > > > -} > > > > > > > - > > > > > > > -void Stream::handle_stream_error(uint32_t len) > > > > > > > -{ > > > > > > > - // TODO read message and use it > > > > > > > - throw ProtocolError("got an error message from server"); > > > > > > > -} > > > > > > > - > > > > > > > -void Stream::read_command_from_device() > > > > > > > -{ > > > > > > > - StreamDevHeader hdr; > > > > > > > - int n; > > > > > > > - > > > > > > > - std::lock_guard<std::mutex> stream_guard(mutex); > > > > > > > - n = read(streamfd, &hdr, sizeof(hdr)); > > > > > > > - if (n != sizeof(hdr)) { > > > > > > > - throw MessageDataError("read command from device > > > > > > > failed", n, sizeof(hdr), errno); > > > > > > > - } > > > > > > > - if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) { > > > > > > > - throw MessageDataError("bad protocol version", > > > > > > > hdr.protocol_version, STREAM_DEVICE_PROTOCOL); > > > > > > > - } > > > > > > > - > > > > > > > - switch (hdr.type) { > > > > > > > - case STREAM_TYPE_CAPABILITIES: > > > > > > > - return handle_stream_capabilities(hdr.size); > > > > > > > - case STREAM_TYPE_NOTIFY_ERROR: > > > > > > > - return handle_stream_error(hdr.size); > > > > > > > - case STREAM_TYPE_START_STOP: > > > > > > > - return handle_stream_start_stop(hdr.size); > > > > > > > - } > > > > > > > - throw MessageDataError("unknown message type", hdr.type, 0); > > > > > > > -} > > > > > > > - > > > > > > > -int Stream::read_command(bool blocking) > > > > > > > -{ > > > > > > > - int timeout = blocking?-1:0; > > > > > > > - while (!quit_requested) { > > > > > > > - if (!have_something_to_read(timeout)) { > > > > > > > - if (!blocking) { > > > > > > > - return 0; > > > > > > > - } > > > > > > > - sleep(1); > > > > > > > - continue; > > > > > > > - } > > > > > > > - read_command_from_device(); > > > > > > > - break; > > > > > > > - } > > > > > > > - > > > > > > > - return 1; > > > > > > > -} > > > > > > > - > > > > > > > -void Stream::write_all(const char *operation, const void *buf, > > > > > > > const size_t len) > > > > > > > -{ > > > > > > > - size_t written = 0; > > > > > > > - while (written < len) { > > > > > > > - int l = write(streamfd, (const char *) buf + written, > > > > > > > len - written); > > > > > > > - if (l < 0) { > > > > > > > - if (errno == EINTR) { > > > > > > > - continue; > > > > > > > - } > > > > > > > - throw WriteError("write failed", operation, > > > > > > > errno).syslog(); > > > > > > > - } > > > > > > > - written += l; > > > > > > > - } > > > > > > > - syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", > > > > > > > written); > > > > > > > -} > > > > > > > +bool quit_requested = false; > > > > > > > > > > > > > > static void handle_interrupt(int intr) > > > > > > > { > > > > > > > diff --git a/src/stream.cpp b/src/stream.cpp > > > > > > > new file mode 100644 > > > > > > > index 0000000..f756097 > > > > > > > --- /dev/null > > > > > > > +++ b/src/stream.cpp > > > > > > > @@ -0,0 +1,172 @@ > > > > > > > +/* Encapsulation of the stream used to communicate between agent > > > > > > > and server > > > > > > > + * > > > > > > > + * \copyright > > > > > > > + * Copyright 2018 Red Hat Inc. All rights reserved. > > > > > > > + */ > > > > > > > + > > > > > > > +#include "stream.hpp" > > > > > > > +#include "message.hpp" > > > > > > > + > > > > > > > +#include <spice/stream-device.h> > > > > > > > + > > > > > > > +#include <spice-streaming-agent/errors.hpp> > > > > > > > + > > > > > > > +#include <sys/types.h> > > > > > > > +#include <sys/stat.h> > > > > > > > +#include <fcntl.h> > > > > > > > +#include <poll.h> > > > > > > > +#include <syslog.h> > > > > > > > +#include <unistd.h> > > > > > > > + > > > > > > > +namespace spice > > > > > > > +{ > > > > > > > +namespace streaming_agent > > > > > > > +{ > > > > > > > + > > > > > > > +class CapabilitiesMessage : public Message<StreamMsgData, > > > > > > > CapabilitiesMessage, > > > > > > > + > > > > > > > STREAM_TYPE_CAPABILITIES> > > > > > > > +{ > > > > > > > +public: > > > > > > > + CapabilitiesMessage() : Message() {} > > > > > > > + static size_t size() > > > > > > > + { > > > > > > > + return sizeof(payload_t); > > > > > > > + } > > > > > > > + void write_message_body(Stream &stream) > > > > > > > + { > > > > > > > + /* No body for capabilities message */ > > > > > > > + } > > > > > > > +}; > > > > > > > > > > > > Not sure I like scattering the messages across source files that > > > > > > happen > > > > > > to use them, though I suppose you did it because each message (like > > > > > > the > > > > > > X11Cursor) may require different header files included? Perhaps it > > > > > > is > > > > > > the way to go… > > > > > > > > > > No, it’s really to de-couple things, a good way to check if > > > > > encapsulation was correct. > > > > > > > > > > > > > > > > > > > > > > > + > > > > > > > +Stream::Stream(const char *name) > > > > > > > + : codecs() > > > > > > > +{ > > > > > > > + streamfd = open(name, O_RDWR); > > > > > > > + if (streamfd < 0) { > > > > > > > + throw IOError("failed to open streaming device", errno); > > > > > > > + } > > > > > > > +} > > > > > > > + > > > > > > > +Stream::~Stream() > > > > > > > +{ > > > > > > > + close(streamfd); > > > > > > > +} > > > > > > > + > > > > > > > +int Stream::have_something_to_read(int timeout) > > > > > > > +{ > > > > > > > + struct pollfd pollfd = {streamfd, POLLIN, 0}; > > > > > > > + > > > > > > > + if (poll(&pollfd, 1, timeout) < 0) { > > > > > > > + syslog(LOG_ERR, "poll FAILED\n"); > > > > > > > + return -1; > > > > > > > + } > > > > > > > + > > > > > > > + if (pollfd.revents == POLLIN) { > > > > > > > + return 1; > > > > > > > + } > > > > > > > + > > > > > > > + return 0; > > > > > > > +} > > > > > > > + > > > > > > > +void Stream::handle_stream_start_stop(uint32_t len) > > > > > > > +{ > > > > > > > + uint8_t msg[256]; > > > > > > > + > > > > > > > + if (len >= sizeof(msg)) { > > > > > > > + throw MessageDataError("message is too long", len, > > > > > > > sizeof(msg)); > > > > > > > + } > > > > > > > + int n = read(streamfd, &msg, len); > > > > > > > + if (n != (int) len) { > > > > > > > + throw MessageDataError("read start/stop command from > > > > > > > device failed", n, len, errno); > > > > > > > + } > > > > > > > + is_streaming = (msg[0] != 0); /* num_codecs */ > > > > > > > + syslog(LOG_INFO, "GOT START_STOP message -- request to %s > > > > > > > streaming\n", > > > > > > > + is_streaming ? "START" : "STOP"); > > > > > > > + codecs.clear(); > > > > > > > + for (int i = 1; i <= msg[0]; ++i) { > > > > > > > + codecs.insert((SpiceVideoCodecType) msg[i]); > > > > > > > + } > > > > > > > +} > > > > > > > + > > > > > > > +void Stream::handle_stream_capabilities(uint32_t len) > > > > > > > +{ > > > > > > > + uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES]; > > > > > > > + > > > > > > > + if (len > sizeof(caps)) { > > > > > > > + throw MessageDataError("capability message too long", > > > > > > > len, sizeof(caps)); > > > > > > > + } > > > > > > > + int n = read(streamfd, caps, len); > > > > > > > + if (n != (int) len) { > > > > > > > + throw MessageDataError("read capabilities from device > > > > > > > failed", n, len, errno); > > > > > > > + } > > > > > > > + > > > > > > > + // we currently do not support extensions so just reply so > > > > > > > + send<CapabilitiesMessage>(); > > > > > > > +} > > > > > > > + > > > > > > > +void Stream::handle_stream_error(uint32_t len) > > > > > > > +{ > > > > > > > + // TODO read message and use it > > > > > > > + throw ProtocolError("got an error message from server"); > > > > > > > +} > > > > > > > + > > > > > > > +void Stream::read_command_from_device() > > > > > > > +{ > > > > > > > + StreamDevHeader hdr; > > > > > > > + int n; > > > > > > > + > > > > > > > + std::lock_guard<std::mutex> stream_guard(mutex); > > > > > > > + n = read(streamfd, &hdr, sizeof(hdr)); > > > > > > > + if (n != sizeof(hdr)) { > > > > > > > + throw MessageDataError("read command from device > > > > > > > failed", n, sizeof(hdr), errno); > > > > > > > + } > > > > > > > + if (hdr.protocol_version != STREAM_DEVICE_PROTOCOL) { > > > > > > > + throw MessageDataError("bad protocol version", > > > > > > > hdr.protocol_version, STREAM_DEVICE_PROTOCOL); > > > > > > > + } > > > > > > > + > > > > > > > + switch (hdr.type) { > > > > > > > + case STREAM_TYPE_CAPABILITIES: > > > > > > > + return handle_stream_capabilities(hdr.size); > > > > > > > + case STREAM_TYPE_NOTIFY_ERROR: > > > > > > > + return handle_stream_error(hdr.size); > > > > > > > + case STREAM_TYPE_START_STOP: > > > > > > > + return handle_stream_start_stop(hdr.size); > > > > > > > + } > > > > > > > + throw MessageDataError("unknown message type", hdr.type, 0); > > > > > > > +} > > > > > > > + > > > > > > > +int Stream::read_command(bool blocking) > > > > > > > +{ > > > > > > > + int timeout = blocking?-1:0; > > > > > > > + while (!quit_requested) { > > > > > > > + if (!have_something_to_read(timeout)) { > > > > > > > + if (!blocking) { > > > > > > > + return 0; > > > > > > > + } > > > > > > > + sleep(1); > > > > > > > + continue; > > > > > > > + } > > > > > > > + read_command_from_device(); > > > > > > > + break; > > > > > > > + } > > > > > > > + > > > > > > > + return 1; > > > > > > > +} > > > > > > > + > > > > > > > +void Stream::write_all(const char *operation, const void *buf, > > > > > > > const size_t len) > > > > > > > +{ > > > > > > > + size_t written = 0; > > > > > > > + while (written < len) { > > > > > > > + int l = write(streamfd, (const char *) buf + written, > > > > > > > len - written); > > > > > > > + if (l < 0) { > > > > > > > + if (errno == EINTR) { > > > > > > > + continue; > > > > > > > + } > > > > > > > + throw WriteError("write failed", operation, > > > > > > > errno).syslog(); > > > > > > > + } > > > > > > > + written += l; > > > > > > > + } > > > > > > > + syslog(LOG_DEBUG, "write_all -- %zu bytes written\n", > > > > > > > written); > > > > > > > +} > > > > > > > + > > > > > > > +}} // namespace spice::streaming_agent > > > > > > > diff --git a/src/stream.hpp b/src/stream.hpp > > > > > > > new file mode 100644 > > > > > > > index 0000000..b689f36 > > > > > > > --- /dev/null > > > > > > > +++ b/src/stream.hpp > > > > > > > @@ -0,0 +1,55 @@ > > > > > > > +/* Encapsulation of the stream used to communicate between agent > > > > > > > and server > > > > > > > + * > > > > > > > + * \copyright > > > > > > > + * Copyright 2018 Red Hat Inc. All rights reserved. > > > > > > > + */ > > > > > > > +#ifndef SPICE_STREAMING_AGENT_STREAM_HPP > > > > > > > +#define SPICE_STREAMING_AGENT_STREAM_HPP > > > > > > > + > > > > > > > +#include <spice/enums.h> > > > > > > > +#include <set> > > > > > > > +#include <mutex> > > > > > > > + > > > > > > > +namespace spice { > > > > > > > +namespace streaming_agent { > > > > > > > + > > > > > > > +class Stream > > > > > > > +{ > > > > > > > + typedef std::set<SpiceVideoCodecType> codecs_t; > > > > > > > + > > > > > > > +public: > > > > > > > + Stream(const char *name); > > > > > > > + ~Stream(); > > > > > > > + > > > > > > > + const codecs_t &client_codecs() { return codecs; } > > > > > > > + bool streaming_requested() { return is_streaming; } > > > > > > > + > > > > > > > + template <typename Message, typename ...PayloadArgs> > > > > > > > + void send(PayloadArgs... payload) > > > > > > > + { > > > > > > > + Message message(payload...); > > > > > > > + std::lock_guard<std::mutex> stream_guard(mutex); > > > > > > > + message.write_header(*this); > > > > > > > + message.write_message_body(*this, payload...); > > > > > > > + } > > > > > > > + > > > > > > > + int read_command(bool blocking); > > > > > > > + void write_all(const char *operation, const void *buf, const > > > > > > > size_t len); > > > > > > > + > > > > > > > +private: > > > > > > > + int have_something_to_read(int timeout); > > > > > > > + void handle_stream_start_stop(uint32_t len); > > > > > > > + void handle_stream_capabilities(uint32_t len); > > > > > > > + void handle_stream_error(uint32_t len); > > > > > > > + void read_command_from_device(void); > > > > > > > + > > > > > > > +private: > > > > > > > + std::mutex mutex; > > > > > > > + codecs_t codecs; > > > > > > > + int streamfd = -1; > > > > > > > + bool is_streaming = false; > > > > > > > +}; > > > > > > > + > > > > > > > +}} // namespace spice::streaming_agent > > > > > > > + > > > > > > > +#endif // SPICE_STREAMING_AGENT_ERRORS_HPP > > _______________________________________________ Spice-devel mailing list Spice-devel@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/spice-devel