[Spice-devel] [PATCH 11/17] Move read, write and locking into the 'Stream' class

Lukáš Hrázký lhrazky at redhat.com
Tue Feb 20 13:29:47 UTC 2018


On Tue, 2018-02-20 at 10:47 +0100, Christophe de Dinechin wrote:
> > On 20 Feb 2018, at 10:43, Lukáš Hrázký <lhrazky at redhat.com> wrote:
> > 
> > On Fri, 2018-02-16 at 17:15 +0100, Christophe de Dinechin wrote:
> > > From: Christophe de Dinechin <dinechin at redhat.com>
> > > 
> > > Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
> > > ---
> > > src/spice-streaming-agent.cpp | 86 +++++++++++++++++++++++--------------------
> > > 1 file changed, 47 insertions(+), 39 deletions(-)
> > > 
> > > diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
> > > index f0d79ae..a989ee7 100644
> > > --- a/src/spice-streaming-agent.cpp
> > > +++ b/src/spice-streaming-agent.cpp
> > > @@ -71,18 +71,30 @@ class Stream
> > > public:
> > >     Stream(const char *name)
> > 
> > I would like to name the class something more descriptive for what it
> > is becoming. Class named Stream in a file named "stream.{cpp,hpp}"
> > could be almost anything.
> 
> But it’s not named “Stream”, it’s called “spice::streaming_agent::Stream” ;-)
> 
> I chose short names because I was in that namespace. Otherwise, I agree with you.
> 
> Do you think that the name is still too vague even within the namespace?

I think so. Everything in streaming agent is in that namespace, even if
it wasn't, you know you're looking at the streaming agent code and
think of the types in that context. Stream is still a pretty generic
name, I suppose you could imagine a number of things under it.

So what is this class in our case. It handles the socket communication
over the streaming channel to the server. Is it accurate to call it a
channel here? If so, maybe StreamingChannel?

> > My best name is StreamDispatcher so far, not
> > entirely happy about it :)
> 
> 
> > 
> > >     {
> > > -        fd = open(name, O_RDWR);
> > > -        if (fd < 0)
> > > +        streamfd = open(name, O_RDWR);
> > > +        if (streamfd < 0)
> > >             throw std::runtime_error("failed to open streaming device");
> > >     }
> > >     ~Stream()
> > >     {
> > > -        close(fd);
> > > +        close(streamfd);
> > >     }
> > > -    operator int() { return fd; }
> > > +    operator int() { return streamfd; }
> > > +
> > > +    int have_something_to_read(int timeout);
> > > +    int read_command_from_device(void);
> > > +    int read_command(bool blocking);
> > > +
> > > +    size_t write_all(const void *buf, const size_t len);
> > 
> > This method could also use a better name. write_bytes()?
> > write_buffer()?
> 
> I intended to do a rename in a follow up. My current choice was “write_packet”, because precisely, it’s not writing bytes or a buffer, it’s making sure the whole packet gets sent.

What do you mean by packet here? Does it have a specific meaning in
this context? It just sends an array of binary data, doesn't it? Like
later on you use it to write the header and message body separately.

> > 
> > > +    int send_format(unsigned w, unsigned h, uint8_t c);
> > > +    int send_frame(const void *buf, const unsigned size);
> > > +    void send_cursor(uint16_t width, uint16_t height,
> > > +                     uint16_t hotspot_x, uint16_t hotspot_y,
> > > +                     std::function<void(uint32_t *)> fill_cursor);
> > > 
> > > private:
> > > -    int fd = -1;
> > > +    int streamfd = -1;
> > > +    std::mutex mutex;
> > > };
> > > 
> > > }} // namespace spice::streaming_agent
> > > @@ -92,9 +104,8 @@ static bool streaming_requested = false;
> > > static bool quit_requested = false;
> > > static bool log_binary = false;
> > > static std::set<SpiceVideoCodecType> client_codecs;
> > > -static std::mutex stream_mtx;
> > > 
> > > -static int have_something_to_read(int streamfd, int timeout)
> > > +int Stream::have_something_to_read(int timeout)
> > > {
> > >     struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > 
> > > @@ -110,13 +121,13 @@ static int have_something_to_read(int streamfd, int timeout)
> > >     return 0;
> > > }
> > > 
> > > -static int read_command_from_device(int streamfd)
> > > +int Stream::read_command_from_device()
> > > {
> > >     StreamDevHeader hdr;
> > >     uint8_t msg[64];
> > >     int n;
> > > 
> > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > >     n = read(streamfd, &hdr, sizeof(hdr));
> > >     if (n != sizeof(hdr)) {
> > >         syslog(LOG_WARNING,
> > > @@ -155,29 +166,28 @@ static int read_command_from_device(int streamfd)
> > >     return 1;
> > > }
> > > 
> > > -static int read_command(int streamfd, bool blocking)
> > > +int Stream::read_command(bool blocking)
> > > {
> > >     int timeout = blocking?-1:0;
> > >     while (!quit_requested) {
> > > -        if (!have_something_to_read(streamfd, timeout)) {
> > > +        if (!have_something_to_read(timeout)) {
> > >             if (!blocking) {
> > >                 return 0;
> > >             }
> > >             sleep(1);
> > >             continue;
> > >         }
> > > -        return read_command_from_device(streamfd);
> > > +        return read_command_from_device();
> > >     }
> > > 
> > >     return 1;
> > > }
> > > 
> > > -static size_t
> > > -write_all(int fd, const void *buf, const size_t len)
> > > +size_t Stream::write_all(const void *buf, const size_t len)
> > > {
> > >     size_t written = 0;
> > >     while (written < len) {
> > > -        int l = write(fd, (const char *) buf + written, len - written);
> > > +        int l = write(streamfd, (const char *) buf + written, len - written);
> > >         if (l < 0) {
> > >             if (errno == EINTR) {
> > >                 continue;
> > > @@ -191,7 +201,7 @@ write_all(int fd, const void *buf, const size_t len)
> > >     return written;
> > > }
> > > 
> > > -static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_t c)
> > > +int Stream::send_format(unsigned w, unsigned h, uint8_t c)
> > > {
> > >     const size_t msgsize = sizeof(FormatMessage);
> > >     const size_t hdrsize  = sizeof(StreamDevHeader);
> > > @@ -210,14 +220,14 @@ static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_
> > >         }
> > >     };
> > >     syslog(LOG_DEBUG, "writing format\n");
> > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
> > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > +    if (write_all(&msg, msgsize) != msgsize) {
> > >         return EXIT_FAILURE;
> > >     }
> > >     return EXIT_SUCCESS;
> > > }
> > > 
> > > -static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned size)
> > > +int Stream::send_frame(const void *buf, const unsigned size)
> > > {
> > >     ssize_t n;
> > >     const size_t msgsize = sizeof(FormatMessage);
> > > @@ -231,8 +241,8 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
> > >         .msg = {}
> > >     };
> > > 
> > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > -    n = write_all(streamfd, &msg, msgsize);
> > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > +    n = write_all(&msg, msgsize);
> > >     syslog(LOG_DEBUG,
> > >            "wrote %ld bytes of header of data msg with frame of size %u bytes\n",
> > >            n, msg.hdr.size);
> > > @@ -241,7 +251,7 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
> > >                n, msgsize);
> > >         return EXIT_FAILURE;
> > >     }
> > > -    n = write_all(streamfd, buf, size);
> > > +    n = write_all(buf, size);
> > >     syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
> > >     if (n != size) {
> > >         syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
> > > @@ -294,11 +304,10 @@ static void usage(const char *progname)
> > >     exit(1);
> > > }
> > > 
> > > -static void
> > > -send_cursor(int streamfd,
> > > -            uint16_t width, uint16_t height,
> > > -            uint16_t hotspot_x, uint16_t hotspot_y,
> > > -            std::function<void(uint32_t *)> fill_cursor)
> > > +void
> > > +Stream::send_cursor(uint16_t width, uint16_t height,
> > > +                    uint16_t hotspot_x, uint16_t hotspot_y,
> > > +                    std::function<void(uint32_t *)> fill_cursor)
> > > {
> > >     if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH ||
> > >         height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT)
> > > @@ -332,11 +341,11 @@ send_cursor(int streamfd,
> > >     uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data);
> > >     fill_cursor(pixels);
> > > 
> > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > -    write_all(streamfd, storage.get(), cursor_msgsize);
> > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > +    write_all(storage.get(), cursor_msgsize);
> > > }
> > > 
> > > -static void cursor_changes(int streamfd, Display *display, int event_base)
> > > +static void cursor_changes(Stream *stream, Display *display, int event_base)
> > > {
> > >     unsigned long last_serial = 0;
> > > 
> > > @@ -358,18 +367,18 @@ static void cursor_changes(int streamfd, Display *display, int event_base)
> > >             for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
> > >                 pixels[i] = cursor->pixels[i];
> > >         };
> > > -        send_cursor(streamfd,
> > > -                    cursor->width, cursor->height, cursor->xhot, cursor->yhot, fill_cursor);
> > > +        stream->send_cursor(cursor->width, cursor->height,
> > > +                            cursor->xhot, cursor->yhot, fill_cursor);
> > >     }
> > > }
> > > 
> > > static void
> > > -do_capture(int streamfd, const char *streamport, FILE *f_log)
> > > +do_capture(Stream &stream, const char *streamport, FILE *f_log)
> > > {
> > >     unsigned int frame_count = 0;
> > >     while (!quit_requested) {
> > >         while (!quit_requested && !streaming_requested) {
> > > -            if (read_command(streamfd, true) < 0) {
> > > +            if (stream.read_command(true) < 0) {
> > >                 syslog(LOG_ERR, "FAILED to read command\n");
> > >                 return;
> > >             }
> > > @@ -413,7 +422,7 @@ do_capture(int streamfd, const char *streamport, FILE *f_log)
> > > 
> > >                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height, codec);
> > > 
> > > -                if (spice_stream_send_format(streamfd, width, height, codec) == EXIT_FAILURE)
> > > +                if (stream.send_format(width, height, codec) == EXIT_FAILURE)
> > >                     throw std::runtime_error("FAILED to send format message");
> > >             }
> > >             if (f_log) {
> > > @@ -425,13 +434,12 @@ do_capture(int streamfd, const char *streamport, FILE *f_log)
> > >                     hexdump(frame.buffer, frame.buffer_size, f_log);
> > >                 }
> > >             }
> > > -            if (spice_stream_send_frame(streamfd,
> > > -                                        frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
> > > +            if (stream.send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
> > >                 syslog(LOG_ERR, "FAILED to send a frame\n");
> > >                 break;
> > >             }
> > >             //usleep(1);
> > > -            if (read_command(streamfd, false) < 0) {
> > > +            if (stream.read_command(false) < 0) {
> > >                 syslog(LOG_ERR, "FAILED to read command\n");
> > >                 return;
> > >             }
> > > @@ -520,7 +528,7 @@ int main(int argc, char* argv[])
> > >     XFixesSelectCursorInput(display, rootwindow, XFixesDisplayCursorNotifyMask);
> > > 
> > >     Stream streamfd(streamport);
> > > -    std::thread cursor_th(cursor_changes, (int) streamfd, display, event_base);
> > > +    std::thread cursor_th(cursor_changes, &streamfd, display, event_base);
> > >     cursor_th.detach();
> > > 
> > >     int ret = EXIT_SUCCESS;
> 
> 


More information about the Spice-devel mailing list