[Spice-devel] [PATCH 11/17] Move read, write and locking into the 'Stream' class

Christophe de Dinechin cdupontd at redhat.com
Wed Feb 21 09:08:21 UTC 2018



> On 20 Feb 2018, at 22:24, Jonathon Jongsma <jjongsma at redhat.com> wrote:
> 
> On Tue, 2018-02-20 at 14:29 +0100, Lukáš Hrázký wrote:
>> On Tue, 2018-02-20 at 10:47 +0100, Christophe de Dinechin wrote:
>>>> On 20 Feb 2018, at 10:43, Lukáš Hrázký <lhrazky at redhat.com>
>>>> wrote:
>>>> 
>>>> On Fri, 2018-02-16 at 17:15 +0100, Christophe de Dinechin wrote:
>>>>> From: Christophe de Dinechin <dinechin at redhat.com>
>>>>> 
>>>>> Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
>>>>> ---
>>>>> src/spice-streaming-agent.cpp | 86 +++++++++++++++++++++++-----
>>>>> ---------------
>>>>> 1 file changed, 47 insertions(+), 39 deletions(-)
>>>>> 
>>>>> diff --git a/src/spice-streaming-agent.cpp b/src/spice-
>>>>> streaming-agent.cpp
>>>>> index f0d79ae..a989ee7 100644
>>>>> --- a/src/spice-streaming-agent.cpp
>>>>> +++ b/src/spice-streaming-agent.cpp
>>>>> @@ -71,18 +71,30 @@ class Stream
>>>>> public:
>>>>>    Stream(const char *name)
>>>> 
>>>> I would like to name the class something more descriptive for
>>>> what it
>>>> is becoming. Class named Stream in a file named
>>>> "stream.{cpp,hpp}"
>>>> could be almost anything.
>>> 
>>> But it’s not named “Stream”, it’s called
>>> “spice::streaming_agent::Stream” ;-)
>>> 
>>> I chose short names because I was in that namespace. Otherwise, I
>>> agree with you.
>>> 
>>> Do you think that the name is still too vague even within the
>>> namespace?
>> 
>> I think so. Everything in streaming agent is in that namespace, even
>> if
>> it wasn't, you know you're looking at the streaming agent code and
>> think of the types in that context. Stream is still a pretty generic
>> name, I suppose you could imagine a number of things under it.
>> 
>> So what is this class in our case. It handles the socket
>> communication
>> over the streaming channel to the server. Is it accurate to call it a
>> channel here? If so, maybe StreamingChannel?
> 
> It's true that "Stream" in the context of the streaming agent could
> make you think that it was actually representing the encoded video
> stream, rather than an encapsulation of a communication channel. But I
> don't like the name StreamDispatcher. Glib uses the name GIOChannel for
> something similar. Maybe IOChannel?

I like the fact that IOChannel points out that it’s both input and output. Sounds good to me.

> 
>> 
>>>> My best name is StreamDispatcher so far, not
>>>> entirely happy about it :)
>>> 
>>> 
>>>> 
>>>>>    {
>>>>> -        fd = open(name, O_RDWR);
>>>>> -        if (fd < 0)
>>>>> +        streamfd = open(name, O_RDWR);
>>>>> +        if (streamfd < 0)
>>>>>            throw std::runtime_error("failed to open streaming
>>>>> device");
>>>>>    }
>>>>>    ~Stream()
>>>>>    {
>>>>> -        close(fd);
>>>>> +        close(streamfd);
>>>>>    }
>>>>> -    operator int() { return fd; }
>>>>> +    operator int() { return streamfd; }
>>>>> +
>>>>> +    int have_something_to_read(int timeout);
>>>>> +    int read_command_from_device(void);
>>>>> +    int read_command(bool blocking);
>>>>> +
>>>>> +    size_t write_all(const void *buf, const size_t len);
>>>> 
>>>> This method could also use a better name. write_bytes()?
>>>> write_buffer()?
>>> 
>>> I intended to do a rename in a follow up. My current choice was
>>> “write_packet”, because precisely, it’s not writing bytes or a
>>> buffer, it’s making sure the whole packet gets sent.
>> 
>> What do you mean by packet here? Does it have a specific meaning in
>> this context? It just sends an array of binary data, doesn't it? Like
>> later on you use it to write the header and message body separately.
>> 
>>>> 
>>>>> +    int send_format(unsigned w, unsigned h, uint8_t c);
>>>>> +    int send_frame(const void *buf, const unsigned size);
>>>>> +    void send_cursor(uint16_t width, uint16_t height,
>>>>> +                     uint16_t hotspot_x, uint16_t hotspot_y,
>>>>> +                     std::function<void(uint32_t *)>
>>>>> fill_cursor);
>>>>> 
>>>>> private:
>>>>> -    int fd = -1;
>>>>> +    int streamfd = -1;
>>>>> +    std::mutex mutex;
>>>>> };
>>>>> 
>>>>> }} // namespace spice::streaming_agent
>>>>> @@ -92,9 +104,8 @@ static bool streaming_requested = false;
>>>>> static bool quit_requested = false;
>>>>> static bool log_binary = false;
>>>>> static std::set<SpiceVideoCodecType> client_codecs;
>>>>> -static std::mutex stream_mtx;
>>>>> 
>>>>> -static int have_something_to_read(int streamfd, int timeout)
>>>>> +int Stream::have_something_to_read(int timeout)
>>>>> {
>>>>>    struct pollfd pollfd = {streamfd, POLLIN, 0};
>>>>> 
>>>>> @@ -110,13 +121,13 @@ static int have_something_to_read(int
>>>>> streamfd, int timeout)
>>>>>    return 0;
>>>>> }
>>>>> 
>>>>> -static int read_command_from_device(int streamfd)
>>>>> +int Stream::read_command_from_device()
>>>>> {
>>>>>    StreamDevHeader hdr;
>>>>>    uint8_t msg[64];
>>>>>    int n;
>>>>> 
>>>>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>>>>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>>>>>    n = read(streamfd, &hdr, sizeof(hdr));
>>>>>    if (n != sizeof(hdr)) {
>>>>>        syslog(LOG_WARNING,
>>>>> @@ -155,29 +166,28 @@ static int read_command_from_device(int
>>>>> streamfd)
>>>>>    return 1;
>>>>> }
>>>>> 
>>>>> -static int read_command(int streamfd, bool blocking)
>>>>> +int Stream::read_command(bool blocking)
>>>>> {
>>>>>    int timeout = blocking?-1:0;
>>>>>    while (!quit_requested) {
>>>>> -        if (!have_something_to_read(streamfd, timeout)) {
>>>>> +        if (!have_something_to_read(timeout)) {
>>>>>            if (!blocking) {
>>>>>                return 0;
>>>>>            }
>>>>>            sleep(1);
>>>>>            continue;
>>>>>        }
>>>>> -        return read_command_from_device(streamfd);
>>>>> +        return read_command_from_device();
>>>>>    }
>>>>> 
>>>>>    return 1;
>>>>> }
>>>>> 
>>>>> -static size_t
>>>>> -write_all(int fd, const void *buf, const size_t len)
>>>>> +size_t Stream::write_all(const void *buf, const size_t len)
>>>>> {
>>>>>    size_t written = 0;
>>>>>    while (written < len) {
>>>>> -        int l = write(fd, (const char *) buf + written, len -
>>>>> written);
>>>>> +        int l = write(streamfd, (const char *) buf + written,
>>>>> len - written);
>>>>>        if (l < 0) {
>>>>>            if (errno == EINTR) {
>>>>>                continue;
>>>>> @@ -191,7 +201,7 @@ write_all(int fd, const void *buf, const
>>>>> size_t len)
>>>>>    return written;
>>>>> }
>>>>> 
>>>>> -static int spice_stream_send_format(int streamfd, unsigned w,
>>>>> unsigned h, uint8_t c)
>>>>> +int Stream::send_format(unsigned w, unsigned h, uint8_t c)
>>>>> {
>>>>>    const size_t msgsize = sizeof(FormatMessage);
>>>>>    const size_t hdrsize  = sizeof(StreamDevHeader);
>>>>> @@ -210,14 +220,14 @@ static int spice_stream_send_format(int
>>>>> streamfd, unsigned w, unsigned h, uint8_
>>>>>        }
>>>>>    };
>>>>>    syslog(LOG_DEBUG, "writing format\n");
>>>>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>>>>> -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
>>>>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>>>>> +    if (write_all(&msg, msgsize) != msgsize) {
>>>>>        return EXIT_FAILURE;
>>>>>    }
>>>>>    return EXIT_SUCCESS;
>>>>> }
>>>>> 
>>>>> -static int spice_stream_send_frame(int streamfd, const void
>>>>> *buf, const unsigned size)
>>>>> +int Stream::send_frame(const void *buf, const unsigned size)
>>>>> {
>>>>>    ssize_t n;
>>>>>    const size_t msgsize = sizeof(FormatMessage);
>>>>> @@ -231,8 +241,8 @@ static int spice_stream_send_frame(int
>>>>> streamfd, const void *buf, const unsigned
>>>>>        .msg = {}
>>>>>    };
>>>>> 
>>>>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>>>>> -    n = write_all(streamfd, &msg, msgsize);
>>>>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>>>>> +    n = write_all(&msg, msgsize);
>>>>>    syslog(LOG_DEBUG,
>>>>>           "wrote %ld bytes of header of data msg with frame of
>>>>> size %u bytes\n",
>>>>>           n, msg.hdr.size);
>>>>> @@ -241,7 +251,7 @@ static int spice_stream_send_frame(int
>>>>> streamfd, const void *buf, const unsigned
>>>>>               n, msgsize);
>>>>>        return EXIT_FAILURE;
>>>>>    }
>>>>> -    n = write_all(streamfd, buf, size);
>>>>> +    n = write_all(buf, size);
>>>>>    syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
>>>>>    if (n != size) {
>>>>>        syslog(LOG_WARNING, "write_all header: wrote %ld
>>>>> expected %u\n",
>>>>> @@ -294,11 +304,10 @@ static void usage(const char *progname)
>>>>>    exit(1);
>>>>> }
>>>>> 
>>>>> -static void
>>>>> -send_cursor(int streamfd,
>>>>> -            uint16_t width, uint16_t height,
>>>>> -            uint16_t hotspot_x, uint16_t hotspot_y,
>>>>> -            std::function<void(uint32_t *)> fill_cursor)
>>>>> +void
>>>>> +Stream::send_cursor(uint16_t width, uint16_t height,
>>>>> +                    uint16_t hotspot_x, uint16_t hotspot_y,
>>>>> +                    std::function<void(uint32_t *)>
>>>>> fill_cursor)
>>>>> {
>>>>>    if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH ||
>>>>>        height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT)
>>>>> @@ -332,11 +341,11 @@ send_cursor(int streamfd,
>>>>>    uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg-
>>>>>> msg.data);
>>>>>    fill_cursor(pixels);
>>>>> 
>>>>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>>>>> -    write_all(streamfd, storage.get(), cursor_msgsize);
>>>>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>>>>> +    write_all(storage.get(), cursor_msgsize);
>>>>> }
>>>>> 
>>>>> -static void cursor_changes(int streamfd, Display *display, int
>>>>> event_base)
>>>>> +static void cursor_changes(Stream *stream, Display *display,
>>>>> int event_base)
>>>>> {
>>>>>    unsigned long last_serial = 0;
>>>>> 
>>>>> @@ -358,18 +367,18 @@ static void cursor_changes(int streamfd,
>>>>> Display *display, int event_base)
>>>>>            for (unsigned i = 0; i < cursor->width * cursor-
>>>>>> height; ++i)
>>>>>                pixels[i] = cursor->pixels[i];
>>>>>        };
>>>>> -        send_cursor(streamfd,
>>>>> -                    cursor->width, cursor->height, cursor-
>>>>>> xhot, cursor->yhot, fill_cursor);
>>>>> +        stream->send_cursor(cursor->width, cursor->height,
>>>>> +                            cursor->xhot, cursor->yhot,
>>>>> fill_cursor);
>>>>>    }
>>>>> }
>>>>> 
>>>>> static void
>>>>> -do_capture(int streamfd, const char *streamport, FILE *f_log)
>>>>> +do_capture(Stream &stream, const char *streamport, FILE
>>>>> *f_log)
>>>>> {
>>>>>    unsigned int frame_count = 0;
>>>>>    while (!quit_requested) {
>>>>>        while (!quit_requested && !streaming_requested) {
>>>>> -            if (read_command(streamfd, true) < 0) {
>>>>> +            if (stream.read_command(true) < 0) {
>>>>>                syslog(LOG_ERR, "FAILED to read command\n");
>>>>>                return;
>>>>>            }
>>>>> @@ -413,7 +422,7 @@ do_capture(int streamfd, const char
>>>>> *streamport, FILE *f_log)
>>>>> 
>>>>>                syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n",
>>>>> width, height, codec);
>>>>> 
>>>>> -                if (spice_stream_send_format(streamfd, width,
>>>>> height, codec) == EXIT_FAILURE)
>>>>> +                if (stream.send_format(width, height, codec)
>>>>> == EXIT_FAILURE)
>>>>>                    throw std::runtime_error("FAILED to send
>>>>> format message");
>>>>>            }
>>>>>            if (f_log) {
>>>>> @@ -425,13 +434,12 @@ do_capture(int streamfd, const char
>>>>> *streamport, FILE *f_log)
>>>>>                    hexdump(frame.buffer, frame.buffer_size,
>>>>> f_log);
>>>>>                }
>>>>>            }
>>>>> -            if (spice_stream_send_frame(streamfd,
>>>>> -                                        frame.buffer,
>>>>> frame.buffer_size) == EXIT_FAILURE) {
>>>>> +            if (stream.send_frame(frame.buffer,
>>>>> frame.buffer_size) == EXIT_FAILURE) {
>>>>>                syslog(LOG_ERR, "FAILED to send a frame\n");
>>>>>                break;
>>>>>            }
>>>>>            //usleep(1);
>>>>> -            if (read_command(streamfd, false) < 0) {
>>>>> +            if (stream.read_command(false) < 0) {
>>>>>                syslog(LOG_ERR, "FAILED to read command\n");
>>>>>                return;
>>>>>            }
>>>>> @@ -520,7 +528,7 @@ int main(int argc, char* argv[])
>>>>>    XFixesSelectCursorInput(display, rootwindow,
>>>>> XFixesDisplayCursorNotifyMask);
>>>>> 
>>>>>    Stream streamfd(streamport);
>>>>> -    std::thread cursor_th(cursor_changes, (int) streamfd,
>>>>> display, event_base);
>>>>> +    std::thread cursor_th(cursor_changes, &streamfd, display,
>>>>> event_base);
>>>>>    cursor_th.detach();
>>>>> 
>>>>>    int ret = EXIT_SUCCESS;
>>> 
>>> 
>> 
>> _______________________________________________
>> Spice-devel mailing list
>> Spice-devel at lists.freedesktop.org
>> https://lists.freedesktop.org/mailman/listinfo/spice-devel
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/spice-devel



More information about the Spice-devel mailing list