[Spice-devel] [PATCH 02/13] server: use proper methods for Stream read/write()
Marc-André Lureau
marcandre.lureau at redhat.com
Tue Feb 22 08:08:56 PST 2011
This allows easier modification of the underlying IO.
We also avoid using the "generic ctx pointer"
Also rename StreamContext for Stream, stylistic change (it's obviously
a context, no?).
---
server/inputs_channel.c | 2 +-
server/main_channel.c | 2 +-
server/red_channel.c | 25 +++---
server/red_channel.h | 6 +-
server/red_dispatcher.c | 8 +-
server/red_tunnel_worker.c | 4 +-
server/red_worker.c | 32 ++++----
server/reds.c | 196 +++++++++++++++++++++++---------------------
server/reds.h | 22 +++--
server/smartcard.c | 2 +-
server/snd_worker.c | 22 +++--
11 files changed, 171 insertions(+), 150 deletions(-)
diff --git a/server/inputs_channel.c b/server/inputs_channel.c
index b7ae55a..0781e62 100644
--- a/server/inputs_channel.c
+++ b/server/inputs_channel.c
@@ -508,7 +508,7 @@ static int inputs_channel_config_socket(RedChannel *channel)
return TRUE;
}
-static void inputs_link(Channel *channel, RedsStreamContext *peer, int migration,
+static void inputs_link(Channel *channel, RedsStream *peer, int migration,
int num_common_caps, uint32_t *common_caps, int num_caps,
uint32_t *caps)
{
diff --git a/server/main_channel.c b/server/main_channel.c
index f1fb4c6..ec234dd 100644
--- a/server/main_channel.c
+++ b/server/main_channel.c
@@ -776,7 +776,7 @@ static int main_channel_config_socket(RedChannel *channel)
return TRUE;
}
-static void main_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
+static void main_channel_link(Channel *channel, RedsStream *peer, int migration,
int num_common_caps, uint32_t *common_caps, int num_caps,
uint32_t *caps)
{
diff --git a/server/red_channel.c b/server/red_channel.c
index a13ef0e..36e9f68 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -34,7 +34,7 @@ static void red_channel_pipe_clear(RedChannel *channel);
static void red_channel_event(int fd, int event, void *data);
/* return the number of bytes read. -1 in case of error */
-static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size)
+static int red_peer_receive(RedsStream *peer, uint8_t *buf, uint32_t size)
{
uint8_t *pos = buf;
while (size) {
@@ -42,7 +42,8 @@ static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size
if (peer->shutdown) {
return -1;
}
- if ((now = peer->cb_read(peer->ctx, pos, size)) <= 0) {
+ now = reds_stream_read(peer, pos, size);
+ if (now <= 0) {
if (now == 0) {
return -1;
}
@@ -65,7 +66,7 @@ static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size
return pos - buf;
}
-static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
+static void red_peer_handle_incoming(RedsStream *peer, IncomingHandler *handler)
{
int bytes_read;
uint8_t *parsed;
@@ -143,9 +144,10 @@ static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *h
}
}
-static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler)
+static void red_peer_handle_outgoing(RedsStream *peer, OutgoingHandler *handler)
{
- int n;
+ ssize_t n;
+
if (handler->size == 0) {
handler->vec = handler->vec_buf;
handler->size = handler->get_msg_size(handler->opaque);
@@ -153,9 +155,11 @@ static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *h
return;
}
}
+
for (;;) {
handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
- if ((n = peer->cb_writev(peer->ctx, handler->vec, handler->vec_size)) == -1) {
+ n = reds_stream_writev(peer, handler->vec, handler->vec_size);
+ if (n == -1) {
switch (errno) {
case EAGAIN:
handler->on_block(handler->opaque);
@@ -243,7 +247,7 @@ static void red_channel_peer_on_out_msg_done(void *opaque)
}
}
-RedChannel *red_channel_create(int size, RedsStreamContext *peer,
+RedChannel *red_channel_create(int size, RedsStream *peer,
SpiceCoreInterface *core,
int migrate, int handle_acks,
channel_configure_socket_proc config_socket,
@@ -307,7 +311,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
error:
spice_marshaller_destroy(channel->send_data.marshaller);
free(channel);
- peer->cb_free(peer);
+ reds_stream_free(peer);
return NULL;
}
@@ -321,7 +325,7 @@ int do_nothing_handle_message(RedChannel *red_channel, SpiceDataHeader *header,
return TRUE;
}
-RedChannel *red_channel_create_parser(int size, RedsStreamContext *peer,
+RedChannel *red_channel_create_parser(int size, RedsStream *peer,
SpiceCoreInterface *core,
int migrate, int handle_acks,
channel_configure_socket_proc config_socket,
@@ -356,8 +360,7 @@ void red_channel_destroy(RedChannel *channel)
return;
}
red_channel_pipe_clear(channel);
- channel->core->watch_remove(channel->peer->watch);
- channel->peer->cb_free(channel->peer);
+ reds_stream_free(channel->peer);
spice_marshaller_destroy(channel->send_data.marshaller);
free(channel);
}
diff --git a/server/red_channel.h b/server/red_channel.h
index 893a7f8..ae58522 100644
--- a/server/red_channel.h
+++ b/server/red_channel.h
@@ -110,7 +110,7 @@ typedef void (*channel_on_incoming_error_proc)(RedChannel *channel);
typedef void (*channel_on_outgoing_error_proc)(RedChannel *channel);
struct RedChannel {
- RedsStreamContext *peer;
+ RedsStream *peer;
SpiceCoreInterface *core;
int migrate;
int handle_acks;
@@ -154,7 +154,7 @@ struct RedChannel {
/* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't
explicitly destroy the channel */
-RedChannel *red_channel_create(int size, RedsStreamContext *peer,
+RedChannel *red_channel_create(int size, RedsStream *peer,
SpiceCoreInterface *core,
int migrate, int handle_acks,
channel_configure_socket_proc config_socket,
@@ -167,7 +167,7 @@ RedChannel *red_channel_create(int size, RedsStreamContext *peer,
/* alternative constructor, meant for marshaller based (inputs,main) channels,
* will become default eventually */
-RedChannel *red_channel_create_parser(int size, RedsStreamContext *peer,
+RedChannel *red_channel_create_parser(int size, RedsStream *peer,
SpiceCoreInterface *core,
int migrate, int handle_acks,
channel_configure_socket_proc config_socket,
diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c
index 2a3c297..3816e14 100644
--- a/server/red_dispatcher.c
+++ b/server/red_dispatcher.c
@@ -71,7 +71,7 @@ extern spice_wan_compression_t zlib_glz_state;
static RedDispatcher *dispatchers = NULL;
-static void red_dispatcher_set_peer(Channel *channel, RedsStreamContext *peer, int migration,
+static void red_dispatcher_set_peer(Channel *channel, RedsStream *peer, int migration,
int num_common_caps, uint32_t *common_caps, int num_caps,
uint32_t *caps)
{
@@ -81,7 +81,7 @@ static void red_dispatcher_set_peer(Channel *channel, RedsStreamContext *peer, i
dispatcher = (RedDispatcher *)channel->data;
RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CONNECT;
write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &peer, sizeof(RedsStreamContext *));
+ send_data(dispatcher->channel, &peer, sizeof(RedsStream *));
send_data(dispatcher->channel, &migration, sizeof(int));
}
@@ -101,7 +101,7 @@ static void red_dispatcher_migrate(Channel *channel)
write_message(dispatcher->channel, &message);
}
-static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStreamContext *peer,
+static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStream *peer,
int migration, int num_common_caps,
uint32_t *common_caps, int num_caps,
uint32_t *caps)
@@ -110,7 +110,7 @@ static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStreamContext *
red_printf("");
RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CONNECT;
write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &peer, sizeof(RedsStreamContext *));
+ send_data(dispatcher->channel, &peer, sizeof(RedsStream *));
send_data(dispatcher->channel, &migration, sizeof(int));
}
diff --git a/server/red_tunnel_worker.c b/server/red_tunnel_worker.c
index 6092a76..267de4a 100644
--- a/server/red_tunnel_worker.c
+++ b/server/red_tunnel_worker.c
@@ -598,7 +598,7 @@ static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer,
/* reds interface */
-static void handle_tunnel_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
+static void handle_tunnel_channel_link(Channel *channel, RedsStream *peer, int migration,
int num_common_caps, uint32_t *common_caps, int num_caps,
uint32_t *caps);
static void handle_tunnel_channel_shutdown(struct Channel *channel);
@@ -3420,7 +3420,7 @@ static void on_new_tunnel_channel(TunnelChannel *channel)
}
}
-static void handle_tunnel_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
+static void handle_tunnel_channel_link(Channel *channel, RedsStream *peer, int migration,
int num_common_caps, uint32_t *common_caps, int num_caps,
uint32_t *caps)
{
diff --git a/server/red_worker.c b/server/red_worker.c
index dc7bc9e..446fae4 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -356,7 +356,7 @@ struct RedChannel {
uint32_t id;
spice_parse_channel_func_t parser;
struct RedWorker *worker;
- RedsStreamContext *peer;
+ RedsStream *peer;
int migrate;
Ring pipe;
@@ -7324,9 +7324,9 @@ static void inline channel_release_res(RedChannel *channel)
static void red_send_data(RedChannel *channel, void *item)
{
for (;;) {
- uint32_t n = channel->send_data.size - channel->send_data.pos;
+ ssize_t n = channel->send_data.size - channel->send_data.pos;
struct iovec vec[MAX_SEND_VEC];
- int vec_size;
+ size_t vec_size;
if (!n) {
channel->send_data.blocked = FALSE;
@@ -7339,7 +7339,8 @@ static void red_send_data(RedChannel *channel, void *item)
vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
vec, MAX_SEND_VEC, channel->send_data.pos);
ASSERT(channel->peer);
- if ((n = channel->peer->cb_writev(channel->peer->ctx, vec, vec_size)) == -1) {
+ n = reds_stream_writev(channel->peer, vec, vec_size);
+ if (n == -1) {
switch (errno) {
case EAGAIN:
channel->send_data.blocked = TRUE;
@@ -8524,9 +8525,7 @@ static void red_disconnect_channel(RedChannel *channel)
{
channel_release_res(channel);
red_pipe_clear(channel);
-
- channel->peer->cb_free(channel->peer);
-
+ reds_stream_free(channel->peer);
channel->peer = NULL;
channel->send_data.blocked = FALSE;
channel->send_data.size = channel->send_data.pos = 0;
@@ -9253,7 +9252,8 @@ static void red_receive(RedChannel *channel)
n = channel->recive_data.end - channel->recive_data.now;
ASSERT(n);
ASSERT(channel->peer);
- if ((n = channel->peer->cb_read(channel->peer->ctx, channel->recive_data.now, n)) <= 0) {
+ n = reds_stream_read(channel->peer, channel->recive_data.now, n);
+ if (n <= 0) {
if (n == 0) {
channel->disconnect(channel);
return;
@@ -9319,7 +9319,7 @@ static void red_receive(RedChannel *channel)
}
static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id,
- RedsStreamContext *peer, int migrate,
+ RedsStream *peer, int migrate,
event_listener_action_proc handler,
disconnect_channel_proc disconnect,
hold_item_proc hold_item,
@@ -9383,7 +9383,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
error2:
free(channel);
error1:
- peer->cb_free(peer);
+ reds_stream_free(peer);
return NULL;
}
@@ -9445,7 +9445,7 @@ static void display_channel_release_item(RedChannel *channel, void *item)
}
}
-static void handle_new_display_channel(RedWorker *worker, RedsStreamContext *peer, int migrate)
+static void handle_new_display_channel(RedWorker *worker, RedsStream *peer, int migrate)
{
DisplayChannel *display_channel;
size_t stream_buf_size;
@@ -9568,7 +9568,7 @@ static void cursor_channel_release_item(RedChannel *channel, void *item)
red_release_cursor(channel->worker, item);
}
-static void red_connect_cursor(RedWorker *worker, RedsStreamContext *peer, int migrate)
+static void red_connect_cursor(RedWorker *worker, RedsStream *peer, int migrate)
{
CursorChannel *channel;
@@ -10004,11 +10004,11 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
handle_dev_destroy_primary_surface(worker);
break;
case RED_WORKER_MESSAGE_DISPLAY_CONNECT: {
- RedsStreamContext *peer;
+ RedsStream *peer;
int migrate;
red_printf("connect");
- receive_data(worker->channel, &peer, sizeof(RedsStreamContext *));
+ receive_data(worker->channel, &peer, sizeof(RedsStream *));
receive_data(worker->channel, &migrate, sizeof(int));
handle_new_display_channel(worker, peer, migrate);
break;
@@ -10052,11 +10052,11 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
red_migrate_display(worker);
break;
case RED_WORKER_MESSAGE_CURSOR_CONNECT: {
- RedsStreamContext *peer;
+ RedsStream *peer;
int migrate;
red_printf("cursor connect");
- receive_data(worker->channel, &peer, sizeof(RedsStreamContext *));
+ receive_data(worker->channel, &peer, sizeof(RedsStream *));
receive_data(worker->channel, &migrate, sizeof(int));
red_connect_cursor(worker, peer, migrate);
break;
diff --git a/server/reds.c b/server/reds.c
index d92f701..d597e93 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -224,7 +224,7 @@ typedef struct RedsState {
static RedsState *reds = NULL;
typedef struct AsyncRead {
- RedsStreamContext *peer;
+ RedsStream *peer;
void *opaque;
uint8_t *now;
uint8_t *end;
@@ -233,7 +233,7 @@ typedef struct AsyncRead {
} AsyncRead;
typedef struct RedLinkInfo {
- RedsStreamContext *peer;
+ RedsStream *peer;
AsyncRead asyc_read;
SpiceLinkHeader link_header;
SpiceLinkMess *link_mess;
@@ -297,85 +297,65 @@ static ChannelSecurityOptions *find_channel_security(int id)
return now;
}
-static void reds_channel_event(RedsStreamContext *peer, int event)
+static void reds_channel_event(RedsStream *peer, int event)
{
if (core->base.minor_version < 3 || core->channel_event == NULL)
return;
core->channel_event(event, &peer->info);
}
-static int reds_write(void *ctx, void *buf, size_t size)
+static ssize_t stream_write_cb(RedsStream *s, const void *buf, size_t size)
{
- int return_code;
- int sock = (long)ctx;
- size_t count = size;
-
- return_code = write(sock, buf, count);
-
- return (return_code);
+ return write(s->socket, buf, size);
}
-static int reds_read(void *ctx, void *buf, size_t size)
+static ssize_t stream_writev_cb(RedsStream *s, const struct iovec *iov, int iovcnt)
{
- int return_code;
- int sock = (long)ctx;
- size_t count = size;
-
- return_code = read(sock, buf, count);
-
- return (return_code);
+ return writev(s->socket, iov, iovcnt);
}
-static int reds_free(RedsStreamContext *peer)
+static ssize_t stream_read_cb(RedsStream *s, void *buf, size_t size)
{
- reds_channel_event(peer, SPICE_CHANNEL_EVENT_DISCONNECTED);
- close(peer->socket);
- free(peer);
- return 0;
+ return read(s->socket, buf, size);
}
-static int reds_ssl_write(void *ctx, void *buf, size_t size)
+static ssize_t stream_ssl_write_cb(RedsStream *s, const void *buf, size_t size)
{
int return_code;
int ssl_error;
- SSL *ssl = ctx;
- return_code = SSL_write(ssl, buf, size);
+ return_code = SSL_write(s->ssl, buf, size);
- if (return_code < 0) {
- ssl_error = SSL_get_error(ssl, return_code);
- }
+ if (return_code < 0)
+ ssl_error = SSL_get_error(s->ssl, return_code);
- return (return_code);
+ return return_code;
}
-static int reds_ssl_read(void *ctx, void *buf, size_t size)
+static ssize_t stream_ssl_read_cb(RedsStream *s, void *buf, size_t size)
{
int return_code;
int ssl_error;
- SSL *ssl = ctx;
- return_code = SSL_read(ssl, buf, size);
+ return_code = SSL_read(s->ssl, buf, size);
- if (return_code < 0) {
- ssl_error = SSL_get_error(ssl, return_code);
- }
+ if (return_code < 0)
+ ssl_error = SSL_get_error(s->ssl, return_code);
- return (return_code);
+ return return_code;
}
-static int reds_ssl_writev(void *ctx, const struct iovec *vector, int count)
+static ssize_t stream_ssl_writev_cb(RedsStream *s, const struct iovec *vector, int count)
{
int i;
int n;
- int return_code = 0;
+ ssize_t return_code = 0;
int ssl_error;
- SSL *ssl = ctx;
for (i = 0; i < count; ++i) {
- n = SSL_write(ssl, vector[i].iov_base, vector[i].iov_len);
+ n = SSL_write(s->ssl, vector[i].iov_base, vector[i].iov_len);
if (n <= 0) {
- ssl_error = SSL_get_error(ssl, n);
+ ssl_error = SSL_get_error(s->ssl, n);
if (return_code <= 0) {
return n;
} else {
@@ -389,35 +369,31 @@ static int reds_ssl_writev(void *ctx, const struct iovec *vector, int count)
return return_code;
}
-static int reds_ssl_free(RedsStreamContext *peer)
+static void reds_stream_remove_watch(RedsStream* s)
{
- reds_channel_event(peer, SPICE_CHANNEL_EVENT_DISCONNECTED);
- SSL_free(peer->ssl);
- close(peer->socket);
- free(peer);
- return 0;
+ if (s->watch) {
+ core->watch_remove(s->watch);
+ s->watch = NULL;
+ }
}
-static void __reds_release_link(RedLinkInfo *link)
+static void reds_link_free(RedLinkInfo *link)
{
- ASSERT(link->peer);
- if (link->peer->watch) {
- core->watch_remove(link->peer->watch);
- link->peer->watch = NULL;
- }
+ reds_stream_free(link->peer);
+ link->peer = NULL;
+
free(link->link_mess);
+ link->link_mess = NULL;
+
BN_free(link->tiTicketing.bn);
+ link->tiTicketing.bn = NULL;
+
if (link->tiTicketing.rsa) {
RSA_free(link->tiTicketing.rsa);
+ link->tiTicketing.rsa = NULL;
}
- free(link);
-}
-static inline void reds_release_link(RedLinkInfo *link)
-{
- RedsStreamContext *peer = link->peer;
- __reds_release_link(link);
- peer->cb_free(peer);
+ free(link);
}
#ifdef RED_STATISTICS
@@ -1367,11 +1343,11 @@ void reds_on_main_receive_migrate_data(MainMigrateData *data, uint8_t *end)
while (write_to_vdi_port() || read_from_vdi_port());
}
-static int sync_write(RedsStreamContext *peer, void *in_buf, size_t n)
+static int sync_write(RedsStream *peer, void *in_buf, size_t n)
{
uint8_t *buf = (uint8_t *)in_buf;
while (n) {
- int now = peer->cb_write(peer->ctx, buf, n);
+ int now = reds_stream_write(peer, buf, n);
if (now <= 0) {
if (now == -1 && (errno == EINTR || errno == EAGAIN)) {
continue;
@@ -1479,7 +1455,7 @@ static void reds_send_link_result(RedLinkInfo *link, uint32_t error)
// actually be joined with reds_handle_other_links, ebcome reds_handle_link
static void reds_handle_main_link(RedLinkInfo *link)
{
- RedsStreamContext *peer;
+ RedsStream *peer;
SpiceLinkMess *link_mess;
uint32_t *caps;
uint32_t connection_id;
@@ -1497,7 +1473,7 @@ static void reds_handle_main_link(RedLinkInfo *link)
} else {
if (link_mess->connection_id != reds->link_id) {
reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID);
- reds_release_link(link);
+ reds_link_free(link);
return;
}
reds_send_link_result(link, SPICE_LINK_ERR_OK);
@@ -1512,8 +1488,10 @@ static void reds_handle_main_link(RedLinkInfo *link)
reds_show_new_channel(link, connection_id);
peer = link->peer;
+ reds_stream_remove_watch(peer);
+ link->peer = NULL;
link->link_mess = NULL;
- __reds_release_link(link);
+ reds_link_free(link);
caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
reds->main_channel = main_channel_init();
reds->main_channel->link(reds->main_channel, peer, reds->mig_target, link_mess->num_common_caps,
@@ -1580,7 +1558,7 @@ static void openssl_init(RedLinkInfo *link)
static void reds_handle_other_links(RedLinkInfo *link)
{
Channel *channel;
- RedsStreamContext *peer;
+ RedsStream *peer;
SpiceLinkMess *link_mess;
uint32_t *caps;
@@ -1588,14 +1566,14 @@ static void reds_handle_other_links(RedLinkInfo *link)
if (!reds->link_id || reds->link_id != link_mess->connection_id) {
reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID);
- reds_release_link(link);
+ reds_link_free(link);
return;
}
if (!(channel = reds_find_channel(link_mess->channel_type,
link_mess->channel_id))) {
reds_send_link_result(link, SPICE_LINK_ERR_CHANNEL_NOT_AVAILABLE);
- reds_release_link(link);
+ reds_link_free(link);
return;
}
@@ -1607,8 +1585,10 @@ static void reds_handle_other_links(RedLinkInfo *link)
main_channel_push_notify(reds->main_channel, (uint8_t*)mess, mess_len);
}
peer = link->peer;
+ reds_stream_remove_watch(peer);
+ link->peer = NULL;
link->link_mess = NULL;
- __reds_release_link(link);
+ reds_link_free(link);
caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
channel->link(channel, peer, reds->mig_target, link_mess->num_common_caps,
link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
@@ -1636,13 +1616,13 @@ static void reds_handle_ticket(void *opaque)
reds_send_link_result(link, SPICE_LINK_ERR_PERMISSION_DENIED);
red_printf("Ticketing is enabled, but no password is set. "
"please set a ticket first");
- reds_release_link(link);
+ reds_link_free(link);
return;
}
if (expired || strncmp(password, actual_sever_pass, SPICE_MAX_PASSWORD_LENGTH) != 0) {
reds_send_link_result(link, SPICE_LINK_ERR_PERMISSION_DENIED);
- reds_release_link(link);
+ reds_link_free(link);
return;
}
}
@@ -1670,7 +1650,8 @@ static void async_read_handler(int fd, int event, void *data)
int n = obj->end - obj->now;
ASSERT(n > 0);
- if ((n = obj->peer->cb_read(obj->peer->ctx, obj->now, n)) <= 0) {
+ n = reds_stream_read(obj->peer, obj->now, n);
+ if (n <= 0) {
if (n < 0) {
switch (errno) {
case EAGAIN:
@@ -1722,7 +1703,7 @@ static void reds_handle_read_link_done(void *opaque)
link->link_header.size ||
link_mess->caps_offset < sizeof(*link_mess))) {
reds_send_link_error(link, SPICE_LINK_ERR_INVALID_DATA);
- reds_release_link(link);
+ reds_link_free(link);
return;
}
@@ -1734,12 +1715,12 @@ static void reds_handle_read_link_done(void *opaque)
red_printf("spice channels %d should be encrypted", link_mess->channel_type);
reds_send_link_error(link, SPICE_LINK_ERR_NEED_SECURED);
}
- reds_release_link(link);
+ reds_link_free(link);
return;
}
if (!reds_send_link_ack(link)) {
- reds_release_link(link);
+ reds_link_free(link);
return;
}
@@ -1760,7 +1741,7 @@ static void reds_handle_link_error(void *opaque, int err)
red_printf("%s", strerror(errno));
break;
}
- reds_release_link(link);
+ reds_link_free(link);
}
static void reds_handle_read_header_done(void *opaque)
@@ -1771,7 +1752,7 @@ static void reds_handle_read_header_done(void *opaque)
if (header->magic != SPICE_MAGIC) {
reds_send_link_error(link, SPICE_LINK_ERR_INVALID_MAGIC);
- reds_release_link(link);
+ reds_link_free(link);
return;
}
@@ -1781,7 +1762,7 @@ static void reds_handle_read_header_done(void *opaque)
}
red_printf("version mismatch");
- reds_release_link(link);
+ reds_link_free(link);
return;
}
@@ -1790,7 +1771,7 @@ static void reds_handle_read_header_done(void *opaque)
if (header->size < sizeof(SpiceLinkMess)) {
reds_send_link_error(link, SPICE_LINK_ERR_INVALID_DATA);
red_printf("bad size %u", header->size);
- reds_release_link(link);
+ reds_link_free(link);
return;
}
@@ -1823,7 +1804,7 @@ static void reds_handle_ssl_accept(int fd, int event, void *data)
int ssl_error = SSL_get_error(link->peer->ssl, return_code);
if (ssl_error != SSL_ERROR_WANT_READ && ssl_error != SSL_ERROR_WANT_WRITE) {
red_printf("SSL_accept failed, error=%d", ssl_error);
- reds_release_link(link);
+ reds_link_free(link);
} else {
if (ssl_error == SSL_ERROR_WANT_READ) {
core->watch_update_mask(link->peer->watch, SPICE_WATCH_EVENT_READ);
@@ -1841,7 +1822,7 @@ static void reds_handle_ssl_accept(int fd, int event, void *data)
static RedLinkInfo *__reds_accept_connection(int listen_socket)
{
RedLinkInfo *link;
- RedsStreamContext *peer;
+ RedsStream *peer;
int delay_val = 1;
int flags;
int socket;
@@ -1866,7 +1847,7 @@ static RedLinkInfo *__reds_accept_connection(int listen_socket)
}
link = spice_new0(RedLinkInfo, 1);
- peer = spice_new0(RedsStreamContext, 1);
+ peer = spice_new0(RedsStream, 1);
link->peer = peer;
peer->socket = socket;
@@ -1890,17 +1871,16 @@ error:
static RedLinkInfo *reds_accept_connection(int listen_socket)
{
RedLinkInfo *link;
- RedsStreamContext *peer;
+ RedsStream *peer;
if (!(link = __reds_accept_connection(listen_socket))) {
return NULL;
}
peer = link->peer;
peer->ctx = (void *)((unsigned long)link->peer->socket);
- peer->cb_read = (int (*)(void *, void *, int))reds_read;
- peer->cb_write = (int (*)(void *, void *, int))reds_write;
- peer->cb_writev = (int (*)(void *, const struct iovec *vector, int count))writev;
- peer->cb_free = (int (*)(RedsStreamContext *))reds_free;
+ peer->read = stream_read_cb;
+ peer->write = stream_write_cb;
+ peer->writev = stream_writev_cb;
return link;
}
@@ -1933,10 +1913,9 @@ static void reds_accept_ssl_connection(int fd, int event, void *data)
SSL_set_bio(link->peer->ssl, sbio, sbio);
link->peer->ctx = (void *)(link->peer->ssl);
- link->peer->cb_write = (int (*)(void *, void *, int))reds_ssl_write;
- link->peer->cb_read = (int (*)(void *, void *, int))reds_ssl_read;
- link->peer->cb_writev = reds_ssl_writev;
- link->peer->cb_free = (int (*)(RedsStreamContext *))reds_ssl_free;
+ link->peer->write = stream_ssl_write_cb;
+ link->peer->read = stream_ssl_read_cb;
+ link->peer->writev = stream_ssl_writev_cb;
return_code = SSL_accept(link->peer->ssl);
if (return_code == 1) {
@@ -3169,3 +3148,34 @@ __visible__ int spice_server_migrate_switch(SpiceServer *s)
reds_mig_switch();
return 0;
}
+
+ssize_t reds_stream_read(RedsStream *s, void *buf, size_t nbyte)
+{
+ return s->read(s, buf, nbyte);
+}
+
+ssize_t reds_stream_write(RedsStream *s, const void *buf, size_t nbyte)
+{
+ return s->write(s, buf, nbyte);
+}
+
+ssize_t reds_stream_writev(RedsStream *s, const struct iovec *iov, int iovcnt)
+{
+ return s->writev(s, iov, iovcnt);
+}
+
+void reds_stream_free(RedsStream *s)
+{
+ if (!s)
+ return;
+
+ reds_channel_event(s, SPICE_CHANNEL_EVENT_DISCONNECTED);
+
+ if (s->ssl)
+ SSL_free(s->ssl);
+
+ reds_stream_remove_watch(s);
+ close(s->socket);
+
+ free(s);
+}
diff --git a/server/reds.h b/server/reds.h
index 547c33c..63b73c4 100644
--- a/server/reds.h
+++ b/server/reds.h
@@ -28,7 +28,9 @@
#define __visible__ __attribute__ ((visibility ("default")))
-typedef struct RedsStreamContext {
+typedef struct RedsStream RedsStream;
+
+struct RedsStream {
void *ctx;
int socket;
@@ -41,12 +43,11 @@ typedef struct RedsStreamContext {
SpiceChannelEventInfo info;
- int (*cb_write)(void *, void *, int);
- int (*cb_read)(void *, void *, int);
-
- int (*cb_writev)(void *, const struct iovec *vector, int count);
- int (*cb_free)(struct RedsStreamContext *);
-} RedsStreamContext;
+ /* private */
+ ssize_t (*read)(RedsStream *s, void *buf, size_t nbyte);
+ ssize_t (*write)(RedsStream *s, const void *buf, size_t nbyte);
+ ssize_t (*writev)(RedsStream *s, const struct iovec *iov, int iovcnt);
+};
typedef struct Channel {
struct Channel *next;
@@ -56,7 +57,7 @@ typedef struct Channel {
uint32_t *common_caps;
int num_caps;
uint32_t *caps;
- void (*link)(struct Channel *, RedsStreamContext *peer, int migration, int num_common_caps,
+ void (*link)(struct Channel *, RedsStream *peer, int migration, int num_common_caps,
uint32_t *common_caps, int num_caps, uint32_t *caps);
void (*shutdown)(struct Channel *);
void (*migrate)(struct Channel *);
@@ -73,6 +74,11 @@ struct SpiceNetWireState {
struct TunnelWorker *worker;
};
+ssize_t reds_stream_read(RedsStream *s, void *buf, size_t nbyte);
+ssize_t reds_stream_write(RedsStream *s, const void *buf, size_t nbyte);
+ssize_t reds_stream_writev(RedsStream *s, const struct iovec *iov, int iovcnt);
+void reds_stream_free(RedsStream *s);
+
void reds_desable_mm_timer();
void reds_enable_mm_timer();
void reds_update_mm_timer(uint32_t mm_time);
diff --git a/server/smartcard.c b/server/smartcard.c
index 7c0a5aa..a7d26b6 100644
--- a/server/smartcard.c
+++ b/server/smartcard.c
@@ -485,7 +485,7 @@ static int smartcard_channel_handle_message(RedChannel *channel, SpiceDataHeader
return TRUE;
}
-static void smartcard_link(Channel *channel, RedsStreamContext *peer,
+static void smartcard_link(Channel *channel, RedsStream *peer,
int migration, int num_common_caps,
uint32_t *common_caps, int num_caps,
uint32_t *caps)
diff --git a/server/snd_worker.c b/server/snd_worker.c
index 6c0f9d6..2382a29 100644
--- a/server/snd_worker.c
+++ b/server/snd_worker.c
@@ -73,7 +73,7 @@ typedef void (*cleanup_channel_proc)(SndChannel *channel);
typedef struct SndWorker SndWorker;
struct SndChannel {
- RedsStreamContext *peer;
+ RedsStream *peer;
SndWorker *worker;
spice_parse_channel_func_t parser;
@@ -180,15 +180,15 @@ static void snd_disconnect_channel(SndChannel *channel)
{
SndWorker *worker;
- if (!channel) {
+ if (!channel)
return;
- }
+
channel->cleanup(channel);
worker = channel->worker;
worker->connection = NULL;
core->watch_remove(channel->peer->watch);
channel->peer->watch = NULL;
- channel->peer->cb_free(channel->peer);
+ reds_stream_free(channel->peer);
spice_marshaller_destroy(channel->send_data.marshaller);
free(channel);
}
@@ -243,7 +243,8 @@ static int snd_send_data(SndChannel *channel)
vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
vec, MAX_SEND_VEC, channel->send_data.pos);
- if ((n = channel->peer->cb_writev(channel->peer->ctx, vec, vec_size)) == -1) {
+ n = reds_stream_writev(channel->peer, vec, vec_size);
+ if (n == -1) {
switch (errno) {
case EAGAIN:
channel->blocked = TRUE;
@@ -389,7 +390,8 @@ static void snd_receive(void* data)
ssize_t n;
n = channel->recive_data.end - channel->recive_data.now;
ASSERT(n);
- if ((n = channel->peer->cb_read(channel->peer->ctx, channel->recive_data.now, n)) <= 0) {
+ n = reds_stream_read(channel->peer, channel->recive_data.now, n);
+ if (n <= 0) {
if (n == 0) {
snd_disconnect_channel(channel);
return;
@@ -734,7 +736,7 @@ static void snd_record_send(void* data)
}
static SndChannel *__new_channel(SndWorker *worker, int size, uint32_t channel_id,
- RedsStreamContext *peer,
+ RedsStream *peer,
int migrate, send_messages_proc send_messages,
handle_message_proc handle_message,
on_message_done_proc on_message_done,
@@ -800,7 +802,7 @@ error2:
free(channel);
error1:
- peer->cb_free(peer);
+ reds_stream_free(peer);
return NULL;
}
@@ -931,7 +933,7 @@ static void snd_playback_cleanup(SndChannel *channel)
celt051_mode_destroy(playback_channel->celt_mode);
}
-static void snd_set_playback_peer(Channel *channel, RedsStreamContext *peer, int migration,
+static void snd_set_playback_peer(Channel *channel, RedsStream *peer, int migration,
int num_common_caps, uint32_t *common_caps, int num_caps,
uint32_t *caps)
{
@@ -1097,7 +1099,7 @@ static void snd_record_cleanup(SndChannel *channel)
celt051_mode_destroy(record_channel->celt_mode);
}
-static void snd_set_record_peer(Channel *channel, RedsStreamContext *peer, int migration,
+static void snd_set_record_peer(Channel *channel, RedsStream *peer, int migration,
int num_common_caps, uint32_t *common_caps, int num_caps,
uint32_t *caps)
{
--
1.7.4
More information about the Spice-devel
mailing list