[Spice-devel] [PATCH 09/22] Move read, write, handle and locking into the 'Stream' class

Christophe de Dinechin christophe.de.dinechin at gmail.com
Thu Mar 1 19:52:50 UTC 2018



> On 1 Mar 2018, at 11:59, Frediano Ziglio <fziglio at redhat.com> wrote:
> 
>> 
>> From: Christophe de Dinechin <dinechin at redhat.com>
>> 
>> The 'Stream' class is designed to abstract file I/O. In a subsequent
>> patch, message formatting will be isolated out of the class, but in
>> order to minimize code changes, this intermediate step simply moves
>> the corresponding functions within the Stream class.
>> 
>> Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
>> ---
>> src/spice-streaming-agent.cpp | 108
>> +++++++++++++++++++++++-------------------
>> 1 file changed, 58 insertions(+), 50 deletions(-)
>> 
>> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
>> index 21f9c31..4d24234 100644
>> --- a/src/spice-streaming-agent.cpp
>> +++ b/src/spice-streaming-agent.cpp
>> @@ -40,8 +40,6 @@
>> 
>> using namespace spice::streaming_agent;
>> 
>> -static size_t write_all(int fd, const void *buf, const size_t len);
>> -
>> static ConcreteAgent agent;
>> 
>> namespace spice
>> @@ -72,31 +70,44 @@ class Stream
>> public:
>>     Stream(const char *name)
>>     {
>> -        fd = open(name, O_RDWR);
>> -        if (fd < 0) {
>> +        streamfd = open(name, O_RDWR);
>> +        if (streamfd < 0) {
> 
> is inside Stream, no reason to have a "streamfd" there, "fd" was fine.

while we have ‘streamfd’ and ‘pollfd’ in the same file, I find it clearer that way.

> 
>>             throw std::runtime_error("failed to open streaming device");
>>         }
>>     }
>>     ~Stream()
>>     {
>> -        close(fd);
>> +        close(streamfd);
>>     }
>> -    int file_descriptor() { return fd; }
>> +
>> +    int read_command(bool blocking);
>> +    size_t write_all(const void *buf, const size_t len);
> 
> this should be private, maximum protected. 

in later patches, these become the only public methods in Stream. I prefer the transient state to reflect the final intent.


> 
>> +    int send_format(unsigned w, unsigned h, uint8_t c);
>> +    int send_frame(const void *buf, const unsigned size);
>> +    void send_cursor(uint16_t width, uint16_t height,
>> +                     uint16_t hotspot_x, uint16_t hotspot_y,
>> +                     std::function<void(uint32_t *)> fill_cursor);
>> +
>> +private:
>> +    int have_something_to_read(int timeout);
>> +    void handle_stream_start_stop(uint32_t len);
>> +    void handle_stream_capabilities(uint32_t len);
>> +    void handle_stream_error(uint32_t len);
>> +    void read_command_from_device(void);
>> 
>> private:
>> -    int fd = -1;
>> +    int streamfd = -1;
>> +    std::mutex mutex;
>> };
>> 
>> }} // namespace spice::streaming_agent
>> 
>> -
> 
> spurious line removal, better to stick to 1 line spacing.

can’t have both, this spurious line removal is to stick with one line spacing.

> 
>> static bool streaming_requested = false;
>> static bool quit_requested = false;
>> static bool log_binary = false;
>> static std::set<SpiceVideoCodecType> client_codecs;
>> -static std::mutex stream_mtx;
>> 
>> -static int have_something_to_read(int streamfd, int timeout)
>> +int Stream::have_something_to_read(int timeout)
>> {
>>     struct pollfd pollfd = {streamfd, POLLIN, 0};
>> 
>> @@ -112,7 +123,7 @@ static int have_something_to_read(int streamfd, int
>> timeout)
>>     return 0;
>> }
>> 
>> -static void handle_stream_start_stop(int streamfd, uint32_t len)
>> +void Stream::handle_stream_start_stop(uint32_t len)
>> {
>>     uint8_t msg[256];
>> 
>> @@ -134,7 +145,7 @@ static void handle_stream_start_stop(int streamfd,
>> uint32_t len)
>>     }
>> }
>> 
>> -static void handle_stream_capabilities(int streamfd, uint32_t len)
>> +void Stream::handle_stream_capabilities(uint32_t len)
>> {
>>     uint8_t caps[STREAM_MSG_CAPABILITIES_MAX_BYTES];
>> 
>> @@ -154,23 +165,23 @@ static void handle_stream_capabilities(int streamfd,
>> uint32_t len)
>>         STREAM_TYPE_CAPABILITIES,
>>         0
>>     };
>> -    if (write_all(streamfd, &hdr, sizeof(hdr)) != sizeof(hdr)) {
>> +    if (write_all(&hdr, sizeof(hdr)) != sizeof(hdr)) {
>>         throw std::runtime_error("error writing capabilities");
>>     }
>> }
>> 
>> -static void handle_stream_error(int streamfd, uint32_t len)
>> +void Stream::handle_stream_error(uint32_t len)
>> {
>>     // TODO read message and use it
>>     throw std::runtime_error("got an error message from server");
>> }
>> 
>> -static void read_command_from_device(int streamfd)
>> +void Stream::read_command_from_device()
>> {
>>     StreamDevHeader hdr;
>>     int n;
>> 
>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>>     n = read(streamfd, &hdr, sizeof(hdr));
>>     if (n != sizeof(hdr)) {
>>         throw std::runtime_error("read command from device FAILED -- read "
>>         + std::to_string(n) +
>> @@ -183,39 +194,38 @@ static void read_command_from_device(int streamfd)
>> 
>>     switch (hdr.type) {
>>     case STREAM_TYPE_CAPABILITIES:
>> -        return handle_stream_capabilities(streamfd, hdr.size);
>> +        return handle_stream_capabilities(hdr.size);
>>     case STREAM_TYPE_NOTIFY_ERROR:
>> -        return handle_stream_error(streamfd, hdr.size);
>> +        return handle_stream_error(hdr.size);
>>     case STREAM_TYPE_START_STOP:
>> -        return handle_stream_start_stop(streamfd, hdr.size);
>> +        return handle_stream_start_stop(hdr.size);
>>     }
>>     throw std::runtime_error("UNKNOWN msg of type " +
>>     std::to_string(hdr.type));
>> }
>> 
>> -static int read_command(int streamfd, bool blocking)
>> +int Stream::read_command(bool blocking)
>> {
>>     int timeout = blocking?-1:0;
>>     while (!quit_requested) {
>> -        if (!have_something_to_read(streamfd, timeout)) {
>> +        if (!have_something_to_read(timeout)) {
>>             if (!blocking) {
>>                 return 0;
>>             }
>>             sleep(1);
>>             continue;
>>         }
>> -        read_command_from_device(streamfd);
>> +        read_command_from_device();
>>         break;
>>     }
>> 
>>     return 1;
>> }
>> 
>> -static size_t
>> -write_all(int fd, const void *buf, const size_t len)
>> +size_t Stream::write_all(const void *buf, const size_t len)
>> {
>>     size_t written = 0;
>>     while (written < len) {
>> -        int l = write(fd, (const char *) buf + written, len - written);
>> +        int l = write(streamfd, (const char *) buf + written, len -
>> written);
>>         if (l < 0) {
>>             if (errno == EINTR) {
>>                 continue;
>> @@ -229,7 +239,7 @@ write_all(int fd, const void *buf, const size_t len)
>>     return written;
>> }
>> 
>> -static int spice_stream_send_format(int streamfd, unsigned w, unsigned h,
>> uint8_t c)
>> +int Stream::send_format(unsigned w, unsigned h, uint8_t c)
>> {
>>     const size_t msgsize = sizeof(FormatMessage);
>>     const size_t hdrsize  = sizeof(StreamDevHeader);
>> @@ -248,14 +258,14 @@ static int spice_stream_send_format(int streamfd,
>> unsigned w, unsigned h, uint8_
>>         }
>>     };
>>     syslog(LOG_DEBUG, "writing format\n");
>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> -    if (write_all(streamfd, &msg, msgsize) != msgsize) {
>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>> +    if (write_all(&msg, msgsize) != msgsize) {
>>         return EXIT_FAILURE;
>>     }
>>     return EXIT_SUCCESS;
>> }
>> 
>> -static int spice_stream_send_frame(int streamfd, const void *buf, const
>> unsigned size)
>> +int Stream::send_frame(const void *buf, const unsigned size)
>> {
>>     ssize_t n;
>>     const size_t msgsize = sizeof(FormatMessage);
>> @@ -269,8 +279,8 @@ static int spice_stream_send_frame(int streamfd, const
>> void *buf, const unsigned
>>         .msg = {}
>>     };
>> 
>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> -    n = write_all(streamfd, &msg, msgsize);
>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>> +    n = write_all(&msg, msgsize);
>>     syslog(LOG_DEBUG,
>>            "wrote %ld bytes of header of data msg with frame of size %u
>>            bytes\n",
>>            n, msg.hdr.size);
>> @@ -279,7 +289,7 @@ static int spice_stream_send_frame(int streamfd, const
>> void *buf, const unsigned
>>                n, msgsize);
>>         return EXIT_FAILURE;
>>     }
>> -    n = write_all(streamfd, buf, size);
>> +    n = write_all(buf, size);
>>     syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
>>     if (n != size) {
>>         syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
>> @@ -333,11 +343,10 @@ static void usage(const char *progname)
>>     exit(1);
>> }
>> 
>> -static void
>> -send_cursor(int streamfd,
>> -            uint16_t width, uint16_t height,
>> -            uint16_t hotspot_x, uint16_t hotspot_y,
>> -            std::function<void(uint32_t *)> fill_cursor)
>> +void
>> +Stream::send_cursor(uint16_t width, uint16_t height,
>> +                    uint16_t hotspot_x, uint16_t hotspot_y,
>> +                    std::function<void(uint32_t *)> fill_cursor)
>> {
>>     if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH || height >=
>>     STREAM_MSG_CURSOR_SET_MAX_HEIGHT) {
>>         return;
>> @@ -370,11 +379,11 @@ send_cursor(int streamfd,
>>     uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data);
>>     fill_cursor(pixels);
>> 
>> -    std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> -    write_all(streamfd, storage.get(), msgsize);
>> +    std::lock_guard<std::mutex> stream_guard(mutex);
>> +    write_all(storage.get(), msgsize);
>> }
>> 
>> -static void cursor_changes(int streamfd, Display *display, int event_base)
>> +static void cursor_changes(Stream *stream, Display *display, int event_base)
>> {
>>     unsigned long last_serial = 0;
>> 
>> @@ -399,18 +408,18 @@ static void cursor_changes(int streamfd, Display
>> *display, int event_base)
>>             for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
>>                 pixels[i] = cursor->pixels[i];
>>         };
>> -        send_cursor(streamfd,
>> -                    cursor->width, cursor->height, cursor->xhot,
>> cursor->yhot, fill_cursor);
>> +        stream->send_cursor(cursor->width, cursor->height,
>> +                            cursor->xhot, cursor->yhot, fill_cursor);
>>     }
>> }
>> 
>> static void
>> -do_capture(int streamfd, const char *streamport, FILE *f_log)
>> +do_capture(Stream &stream, const char *streamport, FILE *f_log)
>> {
>>     unsigned int frame_count = 0;
>>     while (!quit_requested) {
>>         while (!quit_requested && !streaming_requested) {
>> -            if (read_command(streamfd, true) < 0) {
>> +            if (stream.read_command(true) < 0) {
>>                 syslog(LOG_ERR, "FAILED to read command\n");
>>                 return;
>>             }
>> @@ -455,7 +464,7 @@ do_capture(int streamfd, const char *streamport, FILE
>> *f_log)
>> 
>>                 syslog(LOG_DEBUG, "wXh %uX%u  codec=%u\n", width, height,
>>                 codec);
>> 
>> -                if (spice_stream_send_format(streamfd, width, height, codec)
>> == EXIT_FAILURE) {
>> +                if (stream.send_format(width, height, codec) ==
>> EXIT_FAILURE) {
>>                     throw std::runtime_error("FAILED to send format
>>                     message");
>>                 }
>>             }
>> @@ -468,13 +477,12 @@ do_capture(int streamfd, const char *streamport, FILE
>> *f_log)
>>                     hexdump(frame.buffer, frame.buffer_size, f_log);
>>                 }
>>             }
>> -            if (spice_stream_send_frame(streamfd,
>> -                                        frame.buffer, frame.buffer_size) ==
>> EXIT_FAILURE) {
>> +            if (stream.send_frame(frame.buffer, frame.buffer_size) ==
>> EXIT_FAILURE) {
>>                 syslog(LOG_ERR, "FAILED to send a frame\n");
>>                 break;
>>             }
>>             //usleep(1);
>> -            if (read_command(streamfd, false) < 0) {
>> +            if (stream.read_command(false) < 0) {
>>                 syslog(LOG_ERR, "FAILED to read command\n");
>>                 return;
>>             }
>> @@ -576,12 +584,12 @@ int main(int argc, char* argv[])
>>     XFixesSelectCursorInput(display, rootwindow,
>>     XFixesDisplayCursorNotifyMask);
>> 
>>     Stream stream(streamport);
>> -    std::thread cursor_th(cursor_changes, stream.file_descriptor(), display,
>> event_base);
>> +    std::thread cursor_th(cursor_changes, &stream, display, event_base);
>>     cursor_th.detach();
>> 
>>     int ret = EXIT_SUCCESS;
>>     try {
>> -        do_capture(stream.file_descriptor(), streamport, f_log);
>> +        do_capture(stream, streamport, f_log);
>>     }
>>     catch (std::runtime_error &err) {
>>         syslog(LOG_ERR, "%s\n", err.what());
> 
> Frediano
> _______________________________________________
> Spice-devel mailing list
> Spice-devel at lists.freedesktop.org
> https://lists.freedesktop.org/mailman/listinfo/spice-devel



More information about the Spice-devel mailing list