[Spice-devel] [PATCH 11/17] Move read, write and locking into the 'Stream' class

Jonathon Jongsma jjongsma at redhat.com
Tue Feb 20 21:24:19 UTC 2018


On Tue, 2018-02-20 at 14:29 +0100, Lukáš Hrázký wrote:
> On Tue, 2018-02-20 at 10:47 +0100, Christophe de Dinechin wrote:
> > > On 20 Feb 2018, at 10:43, Lukáš Hrázký <lhrazky at redhat.com>
> > > wrote:
> > > 
> > > On Fri, 2018-02-16 at 17:15 +0100, Christophe de Dinechin wrote:
> > > > From: Christophe de Dinechin <dinechin at redhat.com>
> > > > 
> > > > Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
> > > > ---
> > > > src/spice-streaming-agent.cpp | 86 +++++++++++++++++++++++-----
> > > > ---------------
> > > > 1 file changed, 47 insertions(+), 39 deletions(-)
> > > > 
> > > > diff --git a/src/spice-streaming-agent.cpp b/src/spice-
> > > > streaming-agent.cpp
> > > > index f0d79ae..a989ee7 100644
> > > > --- a/src/spice-streaming-agent.cpp
> > > > +++ b/src/spice-streaming-agent.cpp
> > > > @@ -71,18 +71,30 @@ class Stream
> > > > public:
> > > >     Stream(const char *name)
> > > 
> > > I would like to name the class something more descriptive for
> > > what it
> > > is becoming. Class named Stream in a file named
> > > "stream.{cpp,hpp}"
> > > could be almost anything.
> > 
> > But it’s not named “Stream”, it’s called
> > “spice::streaming_agent::Stream” ;-)
> > 
> > I chose short names because I was in that namespace. Otherwise, I
> > agree with you.
> > 
> > Do you think that the name is still too vague even within the
> > namespace?
> 
> I think so. Everything in streaming agent is in that namespace, even
> if
> it wasn't, you know you're looking at the streaming agent code and
> think of the types in that context. Stream is still a pretty generic
> name, I suppose you could imagine a number of things under it.
> 
> So what is this class in our case. It handles the socket
> communication
> over the streaming channel to the server. Is it accurate to call it a
> channel here? If so, maybe StreamingChannel?

It's true that "Stream" in the context of the streaming agent could
make you think that it was actually representing the encoded video
stream, rather than an encapsulation of a communication channel. But I
don't like the name StreamDispatcher. Glib uses the name GIOChannel for
something similar. Maybe IOChannel?

> 
> > > My best name is StreamDispatcher so far, not
> > > entirely happy about it :)
> > 
> > 
> > > 
> > > >     {
> > > > -        fd = open(name, O_RDWR);
> > > > -        if (fd < 0)
> > > > +        streamfd = open(name, O_RDWR);
> > > > +        if (streamfd < 0)
> > > >             throw std::runtime_error("failed to open streaming
> > > > device");
> > > >     }
> > > >     ~Stream()
> > > >     {
> > > > -        close(fd);
> > > > +        close(streamfd);
> > > >     }
> > > > -    operator int() { return fd; }
> > > > +    operator int() { return streamfd; }
> > > > +
> > > > +    int have_something_to_read(int timeout);
> > > > +    int read_command_from_device(void);
> > > > +    int read_command(bool blocking);
> > > > +
> > > > +    size_t write_all(const void *buf, const size_t len);
> > > 
> > > This method could also use a better name. write_bytes()?
> > > write_buffer()?
> > 
> > I intended to do a rename in a follow up. My current choice was
> > “write_packet”, because precisely, it’s not writing bytes or a
> > buffer, it’s making sure the whole packet gets sent.
> 
> What do you mean by packet here? Does it have a specific meaning in
> this context? It just sends an array of binary data, doesn't it? Like
> later on you use it to write the header and message body separately.
> 
> > > 
> > > > +    int send_format(unsigned w, unsigned h, uint8_t c);
> > > > +    int send_frame(const void *buf, const unsigned size);
> > > > +    void send_cursor(uint16_t width, uint16_t height,
> > > > +                     uint16_t hotspot_x, uint16_t hotspot_y,
> > > > +                     std::function<void(uint32_t *)>
> > > > fill_cursor);
> > > > 
> > > > private:
> > > > -    int fd = -1;
> > > > +    int streamfd = -1;
> > > > +    std::mutex mutex;
> > > > };
> > > > 
> > > > }} // namespace spice::streaming_agent
> > > > @@ -92,9 +104,8 @@ static bool streaming_requested = false;
> > > > static bool quit_requested = false;
> > > > static bool log_binary = false;
> > > > static std::set<SpiceVideoCodecType> client_codecs;
> > > > -static std::mutex stream_mtx;
> > > > 
> > > > -static int have_something_to_read(int streamfd, int timeout)
> > > > +int Stream::have_something_to_read(int timeout)
> > > > {
> > > >     struct pollfd pollfd = {streamfd, POLLIN, 0};
> > > > 
> > > > @@ -110,13 +121,13 @@ static int have_something_to_read(int
> > > > streamfd, int timeout)
> > > >     return 0;
> > > > }
> > > > 
> > > > -static int read_command_from_device(int streamfd)
> > > > +int Stream::read_command_from_device()
> > > > {
> > > >     StreamDevHeader hdr;
> > > >     uint8_t msg[64];
> > > >     int n;
> > > > 
> > > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > >     n = read(streamfd, &hdr, sizeof(hdr));
> > > >     if (n != sizeof(hdr)) {
> > > >         syslog(LOG_WARNING,
> > > > @@ -155,29 +166,28 @@ static int read_command_from_device(int
> > > > streamfd)
> > > >     return 1;
> > > > }
> > > > 
> > > > -static int read_command(int streamfd, bool blocking)
> > > > +int Stream::read_command(bool blocking)
> > > > {
> > > >     int timeout = blocking?-1:0;
> > > >     while (!quit_requested) {
> > > > -        if (!have_something_to_read(streamfd, timeout)) {
> > > > +        if (!have_something_to_read(timeout)) {
> > > >             if (!blocking) {
> > > >                 return 0;
> > > >             }
> > > >             sleep(1);
> > > >             continue;
> > > >         }
> > > > -        return read_command_from_device(streamfd);
> > > > +        return read_command_from_device();
> > > >     }
> > > > 
> > > >     return 1;
> > > > }
> > > > 
> > > > -static size_t
> > > > -write_all(int fd, const void *buf, const size_t len)
> > > > +size_t Stream::write_all(const void *buf, const size_t len)
> > > > {
> > > >     size_t written = 0;
> > > >     while (written < len) {
> > > > -        int l = write(fd, (const char *) buf + written, len -
> > > > written);
> > > > +        int l = write(streamfd, (const char *) buf + written,
> > > > len - written);
> > > >         if (l < 0) {
> > > >             if (errno == EINTR) {
> > > >                 continue;
> > > > @@ -191,7 +201,7 @@ write_all(int fd, const void *buf, const
> > > > size_t len)
> > > >     return written;
> > > > }
> > > > 
> > > > -static int spice_stream_send_format(int streamfd, unsigned w,
> > > > unsigned h, uint8_t c)
> > > > +int Stream::send_format(unsigned w, unsigned h, uint8_t c)
> > > > {
> > > >     const size_t msgsize = sizeof(FormatMessage);
> > > >     const size_t hdrsize  = sizeof(StreamDevHeader);
> > > > @@ -210,14 +220,14 @@ static int spice_stream_send_format(int
> > > > streamfd, unsigned w, unsigned h, uint8_
> > > >         }
> > > >     };
> > > >     syslog(LOG_DEBUG, "writing format\n");
> > > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > > -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
> > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > +    if (write_all(&msg, msgsize) != msgsize) {
> > > >         return EXIT_FAILURE;
> > > >     }
> > > >     return EXIT_SUCCESS;
> > > > }
> > > > 
> > > > -static int spice_stream_send_frame(int streamfd, const void
> > > > *buf, const unsigned size)
> > > > +int Stream::send_frame(const void *buf, const unsigned size)
> > > > {
> > > >     ssize_t n;
> > > >     const size_t msgsize = sizeof(FormatMessage);
> > > > @@ -231,8 +241,8 @@ static int spice_stream_send_frame(int
> > > > streamfd, const void *buf, const unsigned
> > > >         .msg = {}
> > > >     };
> > > > 
> > > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > > -    n = write_all(streamfd, &msg, msgsize);
> > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > +    n = write_all(&msg, msgsize);
> > > >     syslog(LOG_DEBUG,
> > > >            "wrote %ld bytes of header of data msg with frame of
> > > > size %u bytes\n",
> > > >            n, msg.hdr.size);
> > > > @@ -241,7 +251,7 @@ static int spice_stream_send_frame(int
> > > > streamfd, const void *buf, const unsigned
> > > >                n, msgsize);
> > > >         return EXIT_FAILURE;
> > > >     }
> > > > -    n = write_all(streamfd, buf, size);
> > > > +    n = write_all(buf, size);
> > > >     syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
> > > >     if (n != size) {
> > > >         syslog(LOG_WARNING, "write_all header: wrote %ld
> > > > expected %u\n",
> > > > @@ -294,11 +304,10 @@ static void usage(const char *progname)
> > > >     exit(1);
> > > > }
> > > > 
> > > > -static void
> > > > -send_cursor(int streamfd,
> > > > -            uint16_t width, uint16_t height,
> > > > -            uint16_t hotspot_x, uint16_t hotspot_y,
> > > > -            std::function<void(uint32_t *)> fill_cursor)
> > > > +void
> > > > +Stream::send_cursor(uint16_t width, uint16_t height,
> > > > +                    uint16_t hotspot_x, uint16_t hotspot_y,
> > > > +                    std::function<void(uint32_t *)>
> > > > fill_cursor)
> > > > {
> > > >     if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH ||
> > > >         height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT)
> > > > @@ -332,11 +341,11 @@ send_cursor(int streamfd,
> > > >     uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg-
> > > > >msg.data);
> > > >     fill_cursor(pixels);
> > > > 
> > > > -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
> > > > -    write_all(streamfd, storage.get(), cursor_msgsize);
> > > > +    std::lock_guard<std::mutex> stream_guard(mutex);
> > > > +    write_all(storage.get(), cursor_msgsize);
> > > > }
> > > > 
> > > > -static void cursor_changes(int streamfd, Display *display, int
> > > > event_base)
> > > > +static void cursor_changes(Stream *stream, Display *display,
> > > > int event_base)
> > > > {
> > > >     unsigned long last_serial = 0;
> > > > 
> > > > @@ -358,18 +367,18 @@ static void cursor_changes(int streamfd,
> > > > Display *display, int event_base)
> > > >             for (unsigned i = 0; i < cursor->width * cursor-
> > > > >height; ++i)
> > > >                 pixels[i] = cursor->pixels[i];
> > > >         };
> > > > -        send_cursor(streamfd,
> > > > -                    cursor->width, cursor->height, cursor-
> > > > >xhot, cursor->yhot, fill_cursor);
> > > > +        stream->send_cursor(cursor->width, cursor->height,
> > > > +                            cursor->xhot, cursor->yhot,
> > > > fill_cursor);
> > > >     }
> > > > }
> > > > 
> > > > static void
> > > > -do_capture(int streamfd, const char *streamport, FILE *f_log)
> > > > +do_capture(Stream &stream, const char *streamport, FILE
> > > > *f_log)
> > > > {
> > > >     unsigned int frame_count = 0;
> > > >     while (!quit_requested) {
> > > >         while (!quit_requested && !streaming_requested) {
> > > > -            if (read_command(streamfd, true) < 0) {
> > > > +            if (stream.read_command(true) < 0) {
> > > >                 syslog(LOG_ERR, "FAILED to read command\n");
> > > >                 return;
> > > >             }
> > > > @@ -413,7 +422,7 @@ do_capture(int streamfd, const char
> > > > *streamport, FILE *f_log)
> > > > 
> > > >                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n",
> > > > width, height, codec);
> > > > 
> > > > -                if (spice_stream_send_format(streamfd, width,
> > > > height, codec) == EXIT_FAILURE)
> > > > +                if (stream.send_format(width, height, codec)
> > > > == EXIT_FAILURE)
> > > >                     throw std::runtime_error("FAILED to send
> > > > format message");
> > > >             }
> > > >             if (f_log) {
> > > > @@ -425,13 +434,12 @@ do_capture(int streamfd, const char
> > > > *streamport, FILE *f_log)
> > > >                     hexdump(frame.buffer, frame.buffer_size,
> > > > f_log);
> > > >                 }
> > > >             }
> > > > -            if (spice_stream_send_frame(streamfd,
> > > > -                                        frame.buffer,
> > > > frame.buffer_size) == EXIT_FAILURE) {
> > > > +            if (stream.send_frame(frame.buffer,
> > > > frame.buffer_size) == EXIT_FAILURE) {
> > > >                 syslog(LOG_ERR, "FAILED to send a frame\n");
> > > >                 break;
> > > >             }
> > > >             //usleep(1);
> > > > -            if (read_command(streamfd, false) < 0) {
> > > > +            if (stream.read_command(false) < 0) {
> > > >                 syslog(LOG_ERR, "FAILED to read command\n");
> > > >                 return;
> > > >             }
> > > > @@ -520,7 +528,7 @@ int main(int argc, char* argv[])
> > > >     XFixesSelectCursorInput(display, rootwindow,
> > > > XFixesDisplayCursorNotifyMask);
> > > > 
> > > >     Stream streamfd(streamport);
> > > > -    std::thread cursor_th(cursor_changes, (int) streamfd,
> > > > display, event_base);
> > > > +    std::thread cursor_th(cursor_changes, &streamfd, display,
> > > > event_base);
> > > >     cursor_th.detach();
> > > > 
> > > >     int ret = EXIT_SUCCESS;
> > 
> > 
> 
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/spice-devel


More information about the Spice-devel mailing list