[Spice-devel] [PATCH 11/17] Move read, write and locking into the 'Stream' class
Christophe de Dinechin
christophe.de.dinechin at gmail.com
Tue Feb 20 09:47:16 UTC 2018
> On 20 Feb 2018, at 10:43, Lukáš Hrázký <lhrazky at redhat.com> wrote:
>
> On Fri, 2018-02-16 at 17:15 +0100, Christophe de Dinechin wrote:
>> From: Christophe de Dinechin <dinechin at redhat.com>
>>
>> Signed-off-by: Christophe de Dinechin <dinechin at redhat.com>
>> ---
>> src/spice-streaming-agent.cpp | 86 +++++++++++++++++++++++--------------------
>> 1 file changed, 47 insertions(+), 39 deletions(-)
>>
>> diff --git a/src/spice-streaming-agent.cpp b/src/spice-streaming-agent.cpp
>> index f0d79ae..a989ee7 100644
>> --- a/src/spice-streaming-agent.cpp
>> +++ b/src/spice-streaming-agent.cpp
>> @@ -71,18 +71,30 @@ class Stream
>> public:
>> Stream(const char *name)
>
> I would like to name the class something more descriptive for what it
> is becoming. Class named Stream in a file named "stream.{cpp,hpp}"
> could be almost anything.
But it’s not named “Stream”, it’s called “spice::streaming_agent::Stream” ;-)
I chose short names because I was in that namespace. Otherwise, I agree with you.
Do you think that the name is still too vague even within the namespace?
> My best name is StreamDispatcher so far, not
> entirely happy about it :)
>
>> {
>> - fd = open(name, O_RDWR);
>> - if (fd < 0)
>> + streamfd = open(name, O_RDWR);
>> + if (streamfd < 0)
>> throw std::runtime_error("failed to open streaming device");
>> }
>> ~Stream()
>> {
>> - close(fd);
>> + close(streamfd);
>> }
>> - operator int() { return fd; }
>> + operator int() { return streamfd; }
>> +
>> + int have_something_to_read(int timeout);
>> + int read_command_from_device(void);
>> + int read_command(bool blocking);
>> +
>> + size_t write_all(const void *buf, const size_t len);
>
> This method could also use a better name. write_bytes()?
> write_buffer()?
I intended to do a rename in a follow up. My current choice was “write_packet”, because precisely, it’s not writing bytes or a buffer, it’s making sure the whole packet gets sent.
>
>> + int send_format(unsigned w, unsigned h, uint8_t c);
>> + int send_frame(const void *buf, const unsigned size);
>> + void send_cursor(uint16_t width, uint16_t height,
>> + uint16_t hotspot_x, uint16_t hotspot_y,
>> + std::function<void(uint32_t *)> fill_cursor);
>>
>> private:
>> - int fd = -1;
>> + int streamfd = -1;
>> + std::mutex mutex;
>> };
>>
>> }} // namespace spice::streaming_agent
>> @@ -92,9 +104,8 @@ static bool streaming_requested = false;
>> static bool quit_requested = false;
>> static bool log_binary = false;
>> static std::set<SpiceVideoCodecType> client_codecs;
>> -static std::mutex stream_mtx;
>>
>> -static int have_something_to_read(int streamfd, int timeout)
>> +int Stream::have_something_to_read(int timeout)
>> {
>> struct pollfd pollfd = {streamfd, POLLIN, 0};
>>
>> @@ -110,13 +121,13 @@ static int have_something_to_read(int streamfd, int timeout)
>> return 0;
>> }
>>
>> -static int read_command_from_device(int streamfd)
>> +int Stream::read_command_from_device()
>> {
>> StreamDevHeader hdr;
>> uint8_t msg[64];
>> int n;
>>
>> - std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> + std::lock_guard<std::mutex> stream_guard(mutex);
>> n = read(streamfd, &hdr, sizeof(hdr));
>> if (n != sizeof(hdr)) {
>> syslog(LOG_WARNING,
>> @@ -155,29 +166,28 @@ static int read_command_from_device(int streamfd)
>> return 1;
>> }
>>
>> -static int read_command(int streamfd, bool blocking)
>> +int Stream::read_command(bool blocking)
>> {
>> int timeout = blocking?-1:0;
>> while (!quit_requested) {
>> - if (!have_something_to_read(streamfd, timeout)) {
>> + if (!have_something_to_read(timeout)) {
>> if (!blocking) {
>> return 0;
>> }
>> sleep(1);
>> continue;
>> }
>> - return read_command_from_device(streamfd);
>> + return read_command_from_device();
>> }
>>
>> return 1;
>> }
>>
>> -static size_t
>> -write_all(int fd, const void *buf, const size_t len)
>> +size_t Stream::write_all(const void *buf, const size_t len)
>> {
>> size_t written = 0;
>> while (written < len) {
>> - int l = write(fd, (const char *) buf + written, len - written);
>> + int l = write(streamfd, (const char *) buf + written, len - written);
>> if (l < 0) {
>> if (errno == EINTR) {
>> continue;
>> @@ -191,7 +201,7 @@ write_all(int fd, const void *buf, const size_t len)
>> return written;
>> }
>>
>> -static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_t c)
>> +int Stream::send_format(unsigned w, unsigned h, uint8_t c)
>> {
>> const size_t msgsize = sizeof(FormatMessage);
>> const size_t hdrsize = sizeof(StreamDevHeader);
>> @@ -210,14 +220,14 @@ static int spice_stream_send_format(int streamfd, unsigned w, unsigned h, uint8_
>> }
>> };
>> syslog(LOG_DEBUG, "writing format\n");
>> - std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> - if (write_all(streamfd, &msg, msgsize) != msgsize) {
>> + std::lock_guard<std::mutex> stream_guard(mutex);
>> + if (write_all(&msg, msgsize) != msgsize) {
>> return EXIT_FAILURE;
>> }
>> return EXIT_SUCCESS;
>> }
>>
>> -static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned size)
>> +int Stream::send_frame(const void *buf, const unsigned size)
>> {
>> ssize_t n;
>> const size_t msgsize = sizeof(FormatMessage);
>> @@ -231,8 +241,8 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
>> .msg = {}
>> };
>>
>> - std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> - n = write_all(streamfd, &msg, msgsize);
>> + std::lock_guard<std::mutex> stream_guard(mutex);
>> + n = write_all(&msg, msgsize);
>> syslog(LOG_DEBUG,
>> "wrote %ld bytes of header of data msg with frame of size %u bytes\n",
>> n, msg.hdr.size);
>> @@ -241,7 +251,7 @@ static int spice_stream_send_frame(int streamfd, const void *buf, const unsigned
>> n, msgsize);
>> return EXIT_FAILURE;
>> }
>> - n = write_all(streamfd, buf, size);
>> + n = write_all(buf, size);
>> syslog(LOG_DEBUG, "wrote data msg body of size %ld\n", n);
>> if (n != size) {
>> syslog(LOG_WARNING, "write_all header: wrote %ld expected %u\n",
>> @@ -294,11 +304,10 @@ static void usage(const char *progname)
>> exit(1);
>> }
>>
>> -static void
>> -send_cursor(int streamfd,
>> - uint16_t width, uint16_t height,
>> - uint16_t hotspot_x, uint16_t hotspot_y,
>> - std::function<void(uint32_t *)> fill_cursor)
>> +void
>> +Stream::send_cursor(uint16_t width, uint16_t height,
>> + uint16_t hotspot_x, uint16_t hotspot_y,
>> + std::function<void(uint32_t *)> fill_cursor)
>> {
>> if (width >= STREAM_MSG_CURSOR_SET_MAX_WIDTH ||
>> height >= STREAM_MSG_CURSOR_SET_MAX_HEIGHT)
>> @@ -332,11 +341,11 @@ send_cursor(int streamfd,
>> uint32_t *pixels = reinterpret_cast<uint32_t *>(cursor_msg->msg.data);
>> fill_cursor(pixels);
>>
>> - std::lock_guard<std::mutex> stream_guard(stream_mtx);
>> - write_all(streamfd, storage.get(), cursor_msgsize);
>> + std::lock_guard<std::mutex> stream_guard(mutex);
>> + write_all(storage.get(), cursor_msgsize);
>> }
>>
>> -static void cursor_changes(int streamfd, Display *display, int event_base)
>> +static void cursor_changes(Stream *stream, Display *display, int event_base)
>> {
>> unsigned long last_serial = 0;
>>
>> @@ -358,18 +367,18 @@ static void cursor_changes(int streamfd, Display *display, int event_base)
>> for (unsigned i = 0; i < cursor->width * cursor->height; ++i)
>> pixels[i] = cursor->pixels[i];
>> };
>> - send_cursor(streamfd,
>> - cursor->width, cursor->height, cursor->xhot, cursor->yhot, fill_cursor);
>> + stream->send_cursor(cursor->width, cursor->height,
>> + cursor->xhot, cursor->yhot, fill_cursor);
>> }
>> }
>>
>> static void
>> -do_capture(int streamfd, const char *streamport, FILE *f_log)
>> +do_capture(Stream &stream, const char *streamport, FILE *f_log)
>> {
>> unsigned int frame_count = 0;
>> while (!quit_requested) {
>> while (!quit_requested && !streaming_requested) {
>> - if (read_command(streamfd, true) < 0) {
>> + if (stream.read_command(true) < 0) {
>> syslog(LOG_ERR, "FAILED to read command\n");
>> return;
>> }
>> @@ -413,7 +422,7 @@ do_capture(int streamfd, const char *streamport, FILE *f_log)
>>
>> syslog(LOG_DEBUG, "wXh %uX%u codec=%u\n", width, height, codec);
>>
>> - if (spice_stream_send_format(streamfd, width, height, codec) == EXIT_FAILURE)
>> + if (stream.send_format(width, height, codec) == EXIT_FAILURE)
>> throw std::runtime_error("FAILED to send format message");
>> }
>> if (f_log) {
>> @@ -425,13 +434,12 @@ do_capture(int streamfd, const char *streamport, FILE *f_log)
>> hexdump(frame.buffer, frame.buffer_size, f_log);
>> }
>> }
>> - if (spice_stream_send_frame(streamfd,
>> - frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
>> + if (stream.send_frame(frame.buffer, frame.buffer_size) == EXIT_FAILURE) {
>> syslog(LOG_ERR, "FAILED to send a frame\n");
>> break;
>> }
>> //usleep(1);
>> - if (read_command(streamfd, false) < 0) {
>> + if (stream.read_command(false) < 0) {
>> syslog(LOG_ERR, "FAILED to read command\n");
>> return;
>> }
>> @@ -520,7 +528,7 @@ int main(int argc, char* argv[])
>> XFixesSelectCursorInput(display, rootwindow, XFixesDisplayCursorNotifyMask);
>>
>> Stream streamfd(streamport);
>> - std::thread cursor_th(cursor_changes, (int) streamfd, display, event_base);
>> + std::thread cursor_th(cursor_changes, &streamfd, display, event_base);
>> cursor_th.detach();
>>
>> int ret = EXIT_SUCCESS;
More information about the Spice-devel
mailing list