[Spice-devel] [PATCH 09/22] Move read, write, handle and locking into the 'Stream' class

Frediano Ziglio fziglio at redhat.com
Thu Mar 1 10:59:40 UTC 2018


> 
> From: Christophe de Dinechin <dinechin at redhat.com>
> 
> The 'Stream' class is designed to abstract file I/O. In a subsequent
> patch, message formatting will be isolated out of the class, but in
> order to minimize code changes, this intermediate step simply moves
> the corresponding functions within the Stream class.
> 
> Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
> ---
>  src/spice-streaming-agent.cpp | 108
>  +++++++++++++++++++++++-------------------
>  1 file changed, 58 insertions(+), 50 deletions(-)
> 
> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> index 21f9c31..4d24234 100644
> --- a/src/spice-streaming-agent.cpp
> +++ b/src/spice-streaming-agent.cpp
> @@ -40,8 +40,6 @@
>  
>  using namespace spice::streaming_agent;
>  
> -static size_t write_all(int fd, const void *buf, const size_t len);
> -
>  static ConcreteAgent agent;
>  
>  namespace spice
> @@ -72,31 +70,44 @@ class Stream
>  public:
>      Stream(const char *name)
>      {
> -        fd = open(name, O_RDWR);
> -        if (fd < 0) {
> +        streamfd = open(name, O_RDWR);
> +        if (streamfd < 0) {

is inside Stream, no reason to have a "streamfd" there, "fd" was fine.

>              throw std::runtime_error("failed to open streaming device");
>          }
>      }
>      ~Stream()
>      {
> -        close(fd);
> +        close(streamfd);
>      }
> -    int file_descriptor() { return fd; }
> +
> +    int read_command(bool blocking);
> +    size_t write_all(const void *buf, const size_t len);

this should be private, maximum protected. 

> +    int send_format(unsigned w, unsigned h, uint8_t c);
> +    int send_frame(const void *buf, const unsigned size);
> +    void send_cursor(uint16_t width, uint16_t height,
> +                     uint16_t hotspot_x, uint16_t hotspot_y,
> +                     std::function<void(uint32_t *)> fill_cursor);
> +
> +private:
> +    int have_something_to_read(int timeout);
> +    void handle_stream_start_stop(uint32_t len);
> +    void handle_stream_capabilities(uint32_t len);
> +    void handle_stream_error(uint32_t len);
> +    void read_command_from_device(void);
>  
>  private:
> -    int fd = -1;
> +    int streamfd = -1;
> +    std::mutex mutex;
>  };
>  
>  }} // namespace spice::streaming_agent
>  
> -

spurious line removal, better to stick to 1 line spacing.

>  static bool streaming_requested = false;
>  static bool quit_requested = false;
>  static bool log_binary = false;
>  static std::set<SpiceVideoCodecType> client_codecs;
> -static std::mutex stream_mtx;
>  
> -static int have_something_to_read(int streamfd, int timeout)
> +int Stream::have_something_to_read(int timeout)
>  {
>      struct pollfd pollfd = {streamfd, POLLIN, 0};
>  
> @@ -112,7 +123,7 @@ static int have_something_to_read(int streamfd, int
> timeout)
>      return 0;
>  }
>  
> -static void handle_stream_start_stop(int streamfd, uint32_t len)
> +void Stream::handle_stream_start_stop(uint32_t len)
>  {
>      uint8_t msg[256];
>  
> @@ -134,7 +145,7 @@ static void handle_stream_start_stop(int streamfd,
> uint32_t len)
>      }
>  }
>  
> -static void handle_stream_capabilities(int streamfd, uint32_t len)
> +void Stream::handle_stream_capabilities(uint32_t len)
>  {
>      uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>  
> @@ -154,23 +165,23 @@ static void handle_stream_capabilities(int streamfd,
> uint32_t len)
>          STREAM_TYPE_CAPABILITIES,
>          0
>      };
> -    if (write_all(streamfd, &hdr, sizeof(hdr)) != sizeof(hdr)) {
> +    if (write_all(&hdr, sizeof(hdr)) != sizeof(hdr)) {
>          throw std::runtime_error("error writing capabilities");
>      }
>  }
>  
> -static void handle_stream_error(int streamfd, uint32_t len)
> +void Stream::handle_stream_error(uint32_t len)
>  {
>      // TODO read message and use it
>      throw std::runtime_error("got an error message from server");
>  }
>  
> -static void read_command_from_device(int streamfd)
> +void Stream::read_command_from_device()
>  {
>      StreamDevHeader hdr;
>      int n;
>  
> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> +    std::lock_guard<std::mutex> stream_guard(mutex);
>      n = read(streamfd, &hdr, sizeof(hdr));
>      if (n != sizeof(hdr)) {
>          throw std::runtime_error("read command from device FAILED -- read "
>          + std::to_string(n) +
> @@ -183,39 +194,38 @@ static void read_command_from_device(int streamfd)
>  
>      switch (hdr.type) {
>      case STREAM_TYPE_CAPABILITIES:
> -        return handle_stream_capabilities(streamfd, hdr.size);
> +        return handle_stream_capabilities(hdr.size);
>      case STREAM_TYPE_NOTIFY_ERROR:
> -        return handle_stream_error(streamfd, hdr.size);
> +        return handle_stream_error(hdr.size);
>      case STREAM_TYPE_START_STOP:
> -        return handle_stream_start_stop(streamfd, hdr.size);
> +        return handle_stream_start_stop(hdr.size);
>      }
>      throw std::runtime_error("UNKNOWN msg of type " +
>      std::to_string(hdr.type));
>  }
>  
> -static int read_command(int streamfd, bool blocking)
> +int Stream::read_command(bool blocking)
>  {
>      int timeout = blocking?-1:0;
>      while (!quit_requested) {
> -        if (!have_something_to_read(streamfd, timeout)) {
> +        if (!have_something_to_read(timeout)) {
>              if (!blocking) {
>                  return 0;
>              }
>              sleep(1);
>              continue;
>          }
> -        read_command_from_device(streamfd);
> +        read_command_from_device();
>          break;
>      }
>  
>      return 1;
>  }
>  
> -static size_t
> -write_all(int fd, const void *buf, const size_t len)
> +size_t Stream::write_all(const void *buf, const size_t len)
>  {
>      size_t written = 0;
>      while (written < len) {
> -        int l = write(fd, (const char *) buf + written, len - written);
> +        int l = write(streamfd, (const char *) buf + written, len -
> written);
>          if (l < 0) {
>              if (errno == EINTR) {
>                  continue;
> @@ -229,7 +239,7 @@ write_all(int fd, const void *buf, const size_t len)
>      return written;
>  }
>  
> -static int spice_stream_send_format(int streamfd, unsigned w, unsigned h,
> uint8_t c)
> +int Stream::send_format(unsigned w, unsigned h, uint8_t c)
>  {
>      const size_t msgsize = sizeof(FormatMessage);
>      const size_t hdrsize  = sizeof(StreamDevHeader);
> @@ -248,14 +258,14 @@ static int spice_stream_send_format(int streamfd,
> unsigned w, unsigned h, uint8_
>          }
>      };
>      syslog(LOG_DEBUG, "writing format\n");
> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
> +    std::lock_guard<std::mutex> stream_guard(mutex);
> +    if (write_all(&msg, msgsize) != msgsize) {
>          return EXIT_FAILURE;
>      }
>      return EXIT_SUCCESS;
>  }
>  
> -static int spice_stream_send_frame(int streamfd, const void *buf, const
> unsigned size)
> +int Stream::send_frame(const void *buf, const unsigned size)
>  {
>      ssize_t n;
>      const size_t msgsize = sizeof(FormatMessage);
> @@ -269,8 +279,8 @@ static int spice_stream_send_frame(int streamfd, const
> void *buf, const unsigned
>          .msg = {}
>      };
>  
> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> -    n = write_all(streamfd, &msg, msgsize);
> +    std::lock_guard<std::mutex> stream_guard(mutex);
> +    n = write_all(&msg, msgsize);
>      syslog(LOG_DEBUG,
>             "wrote %ld bytes of header of data msg with frame of size %u
>             bytes\n",
>             n, msg.hdr.size);
> @@ -279,7 +289,7 @@ static int spice_stream_send_frame(int streamfd, const
> void *buf, const unsigned
>                 n, msgsize);
>          return EXIT_FAILURE;
>      }
> -    n = write_all(streamfd, buf, size);
> +    n = write_all(buf, size);
>      syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
>      if (n != size) {
>          syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
> @@ -333,11 +343,10 @@ static void usage(const char *progname)
>      exit(1);
>  }
>  
> -static void
> -send_cursor(int streamfd,
> -            uint16_t width, uint16_t height,
> -            uint16_t hotspot_x, uint16_t hotspot_y,
> -            std::function<void(uint32_t *)> fill_cursor)
> +void
> +Stream::send_cursor(uint16_t width, uint16_t height,
> +                    uint16_t hotspot_x, uint16_t hotspot_y,
> +                    std::function<void(uint32_t *)> fill_cursor)
>  {
>      if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >=
>      STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
>          return;
> @@ -370,11 +379,11 @@ send_cursor(int streamfd,
>      uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data);
>      fill_cursor(pixels);
>  
> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> -    write_all(streamfd, storage.get(), msgsize);
> +    std::lock_guard<std::mutex> stream_guard(mutex);
> +    write_all(storage.get(), msgsize);
>  }
>  
> -static void cursor_changes(int streamfd, Display *display, int event_base)
> +static void cursor_changes(Stream *stream, Display *display, int event_base)
>  {
>      unsigned long last_serial = 0;
>  
> @@ -399,18 +408,18 @@ static void cursor_changes(int streamfd, Display
> *display, int event_base)
>              for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
>                  pixels[i] = cursor->pixels[i];
>          };
> -        send_cursor(streamfd,
> -                    cursor->width, cursor->height, cursor->xhot,
> cursor->yhot, fill_cursor);
> +        stream->send_cursor(cursor->width, cursor->height,
> +                            cursor->xhot, cursor->yhot, fill_cursor);
>      }
>  }
>  
>  static void
> -do_capture(int streamfd, const char *streamport, FILE *f_log)
> +do_capture(Stream &stream, const char *streamport, FILE *f_log)
>  {
>      unsigned int frame_count = 0;
>      while (!quit_requested) {
>          while (!quit_requested && !streaming_requested) {
> -            if (read_command(streamfd, true) < 0) {
> +            if (stream.read_command(true) < 0) {
>                  syslog(LOG_ERR, "FAILED to read command\n");
>                  return;
>              }
> @@ -455,7 +464,7 @@ do_capture(int streamfd, const char *streamport, FILE
> *f_log)
>  
>                  syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height,
>                  codec);
>  
> -                if (spice_stream_send_format(streamfd, width, height, codec)
> == EXIT_FAILURE) {
> +                if (stream.send_format(width, height, codec) ==
> EXIT_FAILURE) {
>                      throw std::runtime_error("FAILED to send format
>                      message");
>                  }
>              }
> @@ -468,13 +477,12 @@ do_capture(int streamfd, const char *streamport, FILE
> *f_log)
>                      hexdump(frame.buffer, frame.buffer_size, f_log);
>                  }
>              }
> -            if (spice_stream_send_frame(streamfd,
> -                                        frame.buffer, frame.buffer_size) ==
> EXIT_FAILURE) {
> +            if (stream.send_frame(frame.buffer, frame.buffer_size) ==
> EXIT_FAILURE) {
>                  syslog(LOG_ERR, "FAILED to send a frame\n");
>                  break;
>              }
>              //usleep(1);
> -            if (read_command(streamfd, false) < 0) {
> +            if (stream.read_command(false) < 0) {
>                  syslog(LOG_ERR, "FAILED to read command\n");
>                  return;
>              }
> @@ -576,12 +584,12 @@ int main(int argc, char* argv[])
>      XFixesSelectCursorInput(display, rootwindow,
>      XFixesDisplayCursorNotifyMask);
>  
>      Stream stream(streamport);
> -    std::thread cursor_th(cursor_changes, stream.file_descriptor(), display,
> event_base);
> +    std::thread cursor_th(cursor_changes, &stream, display, event_base);
>      cursor_th.detach();
>  
>      int ret = EXIT_SUCCESS;
>      try {
> -        do_capture(stream.file_descriptor(), streamport, f_log);
> +        do_capture(stream, streamport, f_log);
>      }
>      catch (std::runtime_error &err) {
>          syslog(LOG_ERR, "%s\n", err.what());

Frediano


More information about the Spice-devel mailing list