[Spice-devel] [PATCH v2 11/24] Move read, write and locking into the 'Stream' class

Frediano Ziglio fziglio at redhat.com
Thu Feb 22 09:08:38 UTC 2018


> 
> From: Christophe de Dinechin <dinechin at redhat.com>
> 
> Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
> ---
>  src/spice-streaming-agent.cpp | 105
>  +++++++++++++++++++++++-------------------
>  1 file changed, 57 insertions(+), 48 deletions(-)
> 
> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> index 3c4fb67..9f4183c 100644
> --- a/src/spice-streaming-agent.cpp
> +++ b/src/spice-streaming-agent.cpp
> @@ -41,8 +41,6 @@
>  
>  using namespace spice::streaming_agent;
>  
> -static size_t write_all(int fd, const void *buf, const size_t len);
> -
>  static ConcreteAgent agent;
>  
>  namespace spice
> @@ -73,18 +71,33 @@ class Stream
>  public:
>      Stream(const char *name)
>      {
> -        fd = open(name, O_RDWR);
> -        if (fd < 0)
> +        streamfd = open(name, O_RDWR);
> +        if (streamfd < 0)
>              throw std::runtime_error("failed to open streaming device");
>      }
>      ~Stream()
>      {
> -        close(fd);
> +        close(streamfd);
>      }
> -    operator int() { return fd; }
> +    operator int() { return streamfd; }
> +
> +    int have_something_to_read(int timeout);
> +    void handle_stream_start_stop(uint32_t len);
> +    void handle_stream_capabilities(uint32_t len);
> +    void handle_stream_error(uint32_t len);
> +    void read_command_from_device(void);

Sure you want all this stuff public?
Taking into account that some are supposed to be called with
mutex hold and that the mutex is controlled by Stream it seems
wrong.

> +    int read_command(bool blocking);
> +
> +    size_t write_all(const void *buf, const size_t len);

this too should be private for the same reason.

> +    int send_format(unsigned w, unsigned h, uint8_t c);
> +    int send_frame(const void *buf, const unsigned size);
> +    void send_cursor(uint16_t width, uint16_t height,
> +                     uint16_t hotspot_x, uint16_t hotspot_y,
> +                     std::function<void(uint32_t *)> fill_cursor);
>  
>  private:
> -    int fd = -1;
> +    int streamfd = -1;

Does not seem a good idea to me that the file descriptor of Stream
is Stream::streamfd instead of Stream::fd.

> +    std::mutex mutex;
>  };
>  
>  }} // namespace spice::streaming_agent
> @@ -94,9 +107,8 @@ static bool streaming_requested = false;
>  static bool quit_requested = false;
>  static bool log_binary = false;
>  static std::set<SpiceVideoCodecType> client_codecs;
> -static std::mutex stream_mtx;
>  
> -static int have_something_to_read(int streamfd, int timeout)
> +int Stream::have_something_to_read(int timeout)
>  {
>      struct pollfd pollfd = {streamfd, POLLIN, 0};
>  
> @@ -112,7 +124,7 @@ static int have_something_to_read(int streamfd, int
> timeout)
>      return 0;
>  }
>  
> -static void handle_stream_start_stop(int streamfd, uint32_t len)
> +void Stream::handle_stream_start_stop(uint32_t len)
>  {
>      uint8_t msg[256];
>  
> @@ -134,7 +146,7 @@ static void handle_stream_start_stop(int streamfd,
> uint32_t len)
>      }
>  }
>  
> -static void handle_stream_capabilities(int streamfd, uint32_t len)
> +void Stream::handle_stream_capabilities(uint32_t len)
>  {
>      uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>  
> @@ -154,23 +166,23 @@ static void handle_stream_capabilities(int streamfd,
> uint32_t len)
>          STREAM_TYPE_CAPABILITIES,
>          0
>      };
> -    if (write_all(streamfd, &hdr, sizeof(hdr)) != sizeof(hdr)) {
> +    if (write_all(&hdr, sizeof(hdr)) != sizeof(hdr)) {
>          throw std::runtime_error("error writing capabilities");
>      }
>  }
>  
> -static void handle_stream_error(int streamfd, uint32_t len)
> +void Stream::handle_stream_error(uint32_t len)
>  {
>      // TODO read message and use it
>      throw std::runtime_error("got an error message from server");
>  }
>  
> -static void read_command_from_device(int streamfd)
> +void Stream::read_command_from_device()
>  {
>      StreamDevHeader hdr;
>      int n;
>  
> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> +    std::lock_guard<std::mutex> stream_guard(mutex);
>      n = read(streamfd, &hdr, sizeof(hdr));
>      if (n != sizeof(hdr)) {
>          throw std::runtime_error("read command from device FAILED -- read "
>          + std::to_string(n) +
> @@ -183,39 +195,38 @@ static void read_command_from_device(int streamfd)
>  
>      switch (hdr.type) {
>      case STREAM_TYPE_CAPABILITIES:
> -        return handle_stream_capabilities(streamfd, hdr.size);
> +        return handle_stream_capabilities(hdr.size);
>      case STREAM_TYPE_NOTIFY_ERROR:
> -        return handle_stream_error(streamfd, hdr.size);
> +        return handle_stream_error(hdr.size);
>      case STREAM_TYPE_START_STOP:
> -        return handle_stream_start_stop(streamfd, hdr.size);
> +        return handle_stream_start_stop(hdr.size);
>      }
>      throw std::runtime_error("UNKNOWN msg of type " +
>      std::to_string(hdr.type));
>  }
>  
> -static int read_command(int streamfd, bool blocking)
> +int Stream::read_command(bool blocking)
>  {
>      int timeout = blocking?-1:0;
>      while (!quit_requested) {
> -        if (!have_something_to_read(streamfd, timeout)) {
> +        if (!have_something_to_read(timeout)) {
>              if (!blocking) {
>                  return 0;
>              }
>              sleep(1);
>              continue;
>          }
> -        read_command_from_device(streamfd);
> +        read_command_from_device();
>          break;
>      }
>  
>      return 1;
>  }
>  
> -static size_t
> -write_all(int fd, const void *buf, const size_t len)
> +size_t Stream::write_all(const void *buf, const size_t len)
>  {
>      size_t written = 0;
>      while (written < len) {
> -        int l = write(fd, (const char *) buf + written, len - written);
> +        int l = write(streamfd, (const char *) buf + written, len -
> written);
>          if (l < 0) {
>              if (errno == EINTR) {
>                  continue;
> @@ -229,7 +240,7 @@ write_all(int fd, const void *buf, const size_t len)
>      return written;
>  }
>  
> -static int spice_stream_send_format(int streamfd, unsigned w, unsigned h,
> uint8_t c)
> +int Stream::send_format(unsigned w, unsigned h, uint8_t c)
>  {
>      const size_t msgsize = sizeof(FormatMessage);
>      const size_t hdrsize  = sizeof(StreamDevHeader);
> @@ -248,14 +259,14 @@ static int spice_stream_send_format(int streamfd,
> unsigned w, unsigned h, uint8_
>          }
>      };
>      syslog(LOG_DEBUG, "writing format\n");
> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
> +    std::lock_guard<std::mutex> stream_guard(mutex);
> +    if (write_all(&msg, msgsize) != msgsize) {
>          return EXIT_FAILURE;
>      }
>      return EXIT_SUCCESS;
>  }
>  
> -static int spice_stream_send_frame(int streamfd, const void *buf, const
> unsigned size)
> +int Stream::send_frame(const void *buf, const unsigned size)
>  {
>      ssize_t n;
>      const size_t msgsize = sizeof(FormatMessage);
> @@ -269,8 +280,8 @@ static int spice_stream_send_frame(int streamfd, const
> void *buf, const unsigned
>          .msg = {}
>      };
>  
> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> -    n = write_all(streamfd, &msg, msgsize);
> +    std::lock_guard<std::mutex> stream_guard(mutex);
> +    n = write_all(&msg, msgsize);
>      syslog(LOG_DEBUG,
>             "wrote %ld bytes of header of data msg with frame of size %u
>             bytes\n",
>             n, msg.hdr.size);
> @@ -279,7 +290,7 @@ static int spice_stream_send_frame(int streamfd, const
> void *buf, const unsigned
>                 n, msgsize);
>          return EXIT_FAILURE;
>      }
> -    n = write_all(streamfd, buf, size);
> +    n = write_all(buf, size);
>      syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
>      if (n != size) {
>          syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
> @@ -332,11 +343,10 @@ static void usage(const char *progname)
>      exit(1);
>  }
>  
> -static void
> -send_cursor(int streamfd,
> -            uint16_t width, uint16_t height,
> -            uint16_t hotspot_x, uint16_t hotspot_y,
> -            std::function<void(uint32_t *)> fill_cursor)
> +void
> +Stream::send_cursor(uint16_t width, uint16_t height,
> +                    uint16_t hotspot_x, uint16_t hotspot_y,
> +                    std::function<void(uint32_t *)> fill_cursor)
>  {
>      if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH ||
>          height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT)
> @@ -370,11 +380,11 @@ send_cursor(int streamfd,
>      uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data);
>      fill_cursor(pixels);
>  
> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> -    write_all(streamfd, storage.get(), cursor_msgsize);
> +    std::lock_guard<std::mutex> stream_guard(mutex);
> +    write_all(storage.get(), cursor_msgsize);
>  }
>  
> -static void cursor_changes(int streamfd, Display *display, int event_base)
> +static void cursor_changes(Stream *stream, Display *display, int event_base)
>  {
>      unsigned long last_serial = 0;
>  
> @@ -396,18 +406,18 @@ static void cursor_changes(int streamfd, Display
> *display, int event_base)
>              for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
>                  pixels[i] = cursor->pixels[i];
>          };
> -        send_cursor(streamfd,
> -                    cursor->width, cursor->height, cursor->xhot,
> cursor->yhot, fill_cursor);
> +        stream->send_cursor(cursor->width, cursor->height,
> +                            cursor->xhot, cursor->yhot, fill_cursor);
>      }
>  }
>  
>  static void
> -do_capture(int streamfd, const char *streamport, FILE *f_log)
> +do_capture(Stream &stream, const char *streamport, FILE *f_log)
>  {
>      unsigned int frame_count = 0;
>      while (!quit_requested) {
>          while (!quit_requested && !streaming_requested) {
> -            if (read_command(streamfd, true) < 0) {
> +            if (stream.read_command(true) < 0) {
>                  syslog(LOG_ERR, "FAILED to read command\n");
>                  return;
>              }
> @@ -451,7 +461,7 @@ do_capture(int streamfd, const char *streamport, FILE
> *f_log)
>  
>                  syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height,
>                  codec);
>  
> -                if (spice_stream_send_format(streamfd, width, height, codec)
> == EXIT_FAILURE)
> +                if (stream.send_format(width, height, codec) ==
> EXIT_FAILURE)
>                      throw std::runtime_error("FAILED to send format
>                      message");
>              }
>              if (f_log) {
> @@ -463,13 +473,12 @@ do_capture(int streamfd, const char *streamport, FILE
> *f_log)
>                      hexdump(frame.buffer, frame.buffer_size, f_log);
>                  }
>              }
> -            if (spice_stream_send_frame(streamfd,
> -                                        frame.buffer, frame.buffer_size) ==
> EXIT_FAILURE) {
> +            if (stream.send_frame(frame.buffer, frame.buffer_size) ==
> EXIT_FAILURE) {
>                  syslog(LOG_ERR, "FAILED to send a frame\n");
>                  break;
>              }
>              //usleep(1);
> -            if (read_command(streamfd, false) < 0) {
> +            if (stream.read_command(false) < 0) {
>                  syslog(LOG_ERR, "FAILED to read command\n");
>                  return;
>              }
> @@ -561,7 +570,7 @@ int main(int argc, char* argv[])
>      XFixesSelectCursorInput(display, rootwindow,
>      XFixesDisplayCursorNotifyMask);
>  
>      Stream streamfd(streamport);
> -    std::thread cursor_th(cursor_changes, (int) streamfd, display,
> event_base);
> +    std::thread cursor_th(cursor_changes, &streamfd, display, event_base);
>      cursor_th.detach();
>  
>      int ret = EXIT_SUCCESS;

Frediano


More information about the Spice-devel mailing list