[systemd-devel] Early review request: socket activation bridge
Zbigniew Jędrzejewski-Szmek
zbyszek at in.waw.pl
Fri Oct 11 06:13:28 PDT 2013
On Thu, Oct 10, 2013 at 09:54:12PM -0700, David Strauss wrote:
> Here's a first take on having sabridge use the systemd-native event
> library. The current, full diff is also visible on GitHub [1].
>
> Obviously, this work still needs considerable cleanup and tightening.
> I like how we're currently hammering out the basics, like the event
> library to use and where the multiprocess/multithreaded logic should
> go in the longer-run.
>
> I'm open to better ideas for the data structures. Right now, the
> priority is to hammer everything into symmetric structures so the
> bi-directionality of the proxy gets abstracted away from the transfer
> function. This is useful for ensuring we have consistent support for
> server-first (MySQL) and client-first (HTTP) protocols.
>
> [1] https://github.com/systemd/systemd/pull/5/files
> /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
>
> /***
> This file is part of systemd.
>
> Copyright 2013 David Strauss
>
> systemd is free software; you can redistribute it and/or modify it
> under the terms of the GNU Lesser General Public License as published by
> the Free Software Foundation; either version 2.1 of the License, or
> (at your option) any later version.
>
> systemd is distributed in the hope that it will be useful, but
> WITHOUT ANY WARRANTY; without even the implied warranty of
> MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> Lesser General Public License for more details.
>
> You should have received a copy of the GNU Lesser General Public License
> along with systemd; If not, see <http://www.gnu.org/licenses/>.
> ***/
>
> #define __STDC_FORMAT_MACROS
> #include <errno.h>
> #include <stdio.h>
> #include <stdlib.h>
> #include <string.h>
> #include <netdb.h>
> #include <sys/fcntl.h>
> #include <sys/socket.h>
> #include <sys/un.h>
> #include <unistd.h>
>
> #include "log.h"
> #include "sd-daemon.h"
> #include "sd-event.h"
>
> #define BUFFER_SIZE 1024
>
> unsigned int total_clients = 0;
>
> struct proxy {
> int listen_fd;
> bool remote_is_inet;
> const char *remote_host;
> const char *remote_service;
> };
>
> struct connection {
> int origin_fd;
> int destination_fd;
> sd_event_source *w_destination;
> struct connection *c_destination;
> };
>
> static int transfer_data_cb(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
> struct connection *connection = (struct connection *) userdata;
>
> char *buffer = malloc(BUFFER_SIZE);
> ssize_t buffer_len;
>
> assert(revents & EPOLLIN);
> assert(fd == connection->origin_fd);
>
> log_info("About to transfer up to %u bytes from %d to %d.", BUFFER_SIZE, connection->origin_fd, connection->destination_fd);
>
> buffer_len = recv(connection->origin_fd, buffer, BUFFER_SIZE, 0);
> if (buffer_len == 0) {
> log_info("Clean disconnection.");
> sd_event_source_unref(connection->w_destination);
> sd_event_source_unref(s);
> close(connection->origin_fd);
> close(connection->destination_fd);
> free(connection->c_destination);
> free(connection);
> goto finish;
> }
> else if (buffer_len == -1) {
> log_error("Error %d in recv from fd=%d: %s", errno, connection->origin_fd, strerror(errno));
> exit(EXIT_FAILURE);
> }
>
> if (send(connection->destination_fd, buffer, buffer_len, 0) < 0) {
> log_error("Error %d in send to fd=%d: %s", errno, connection->destination_fd, strerror(errno));
> exit(EXIT_FAILURE);
> }
If I understand correctly, destination_fd is blocking, and if it is
busy, we'll block here. This means that there must be one thread per
connection direction... Why can't you have two buffers per connection,
one going in, one going out, attached to the connection object, and use
them in FIFO style? As an additional bonus, we'll avoid all those heavy
malloc calls.
> finish:
> free(buffer);
> return 0;
> }
>
> static int connected_to_server_cb(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
> struct connection *c_server_to_client = (struct connection *) userdata;
> struct sd_event *e = sd_event_get(s);
>
> log_info("Connected to server. Initializing watchers for sending data.");
>
> // Start listening for data sent by the client.
> sd_event_add_io(e, c_server_to_client->destination_fd, EPOLLIN, transfer_data_cb, c_server_to_client->c_destination, &c_server_to_client->w_destination);
>
> // Cancel the write watcher for the server.
> sd_event_source_unref(s);
>
> // Start listening for data sent by the server.
> sd_event_add_io(e, c_server_to_client->origin_fd, EPOLLIN, transfer_data_cb, c_server_to_client, &c_server_to_client->c_destination->w_destination);
>
> return 0;
> }
>
>
> static int set_nonblock(int fd) {
> int flags;
> flags = fcntl(fd, F_GETFL);
> flags |= O_NONBLOCK;
> return fcntl(fd, F_SETFL, flags);
> }
>
> static int get_server_connection_fd(const struct proxy *proxy) {
> int server_fd;
> int len;
>
> if (proxy->remote_is_inet) {
> struct addrinfo hints;
> struct addrinfo *result;
> int s;
>
> memset(&hints, 0, sizeof(struct addrinfo));
> hints.ai_family = AF_UNSPEC; /* IPv4 or IPv6 */
> hints.ai_socktype = SOCK_STREAM; /* TCP */
> hints.ai_flags = AI_PASSIVE; /* Any IP address */
>
> //log_error("Looking up address info for %s:%s", proxy->remote_host, proxy->remote_service);
> s = getaddrinfo(proxy->remote_host, proxy->remote_service, &hints, &result);
> if (s != 0) {
> log_error("getaddrinfo error (%d): %s", s, gai_strerror(s));
> exit(EXIT_FAILURE);
> }
>
> if (result == NULL) {
> log_error("getaddrinfo: no result");
> exit(EXIT_FAILURE);
> }
>
> // @TODO: Try connecting to all results instead of just the first.
> server_fd = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
>
> if (-1 == set_nonblock(server_fd)) {
> log_error("Error setting socket to non-blocking.");
> exit(EXIT_FAILURE);
> }
Why not just say 'socket(..., ... | SOCK_NONBLOCK, ...);' here?
> if (!connect(server_fd, result->ai_addr, result->ai_addrlen)) {
> log_error("Could not connect to socket: %s:%s", proxy->remote_host, proxy->remote_service);
> freeaddrinfo(result);
> exit(EXIT_FAILURE);
> }
>
> freeaddrinfo(result);
> }
> else {
> struct sockaddr_un remote;
>
> server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
... and here too?
> if (-1 == server_fd) {
> log_error("Error %d while initializing socket socket: %s", errno, strerror(errno));
> exit(EXIT_FAILURE);
> }
>
> if (-1 == set_nonblock(server_fd)) {
> log_error("Error setting socket to non-blocking.");
> exit(EXIT_FAILURE);
> }
>
> remote.sun_family = AF_UNIX;
> strncpy(remote.sun_path, proxy->remote_host, sizeof(remote.sun_path));
> len = strlen(remote.sun_path) + sizeof(remote.sun_family);
> if (-1 == connect(server_fd, (struct sockaddr *) &remote, len)) {
> log_error("Could not connect to Unix domain socket: %s", proxy->remote_host);
> exit(EXIT_FAILURE);
> }
> }
>
> log_info("Server connection is fd=%d", server_fd);
>
> return server_fd;
> }
>
> static int accept_cb(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
> struct proxy *proxy = (struct proxy *) userdata;
> struct connection *c_server_to_client = malloc(sizeof(struct connection));
> struct connection *c_client_to_server = malloc(sizeof(struct connection));
> int client_fd, server_fd;
> sd_event_source *w_server = NULL;
>
> // @TODO: Remove assumption of IPv4.
> struct sockaddr_in client_addr;
> socklen_t client_len = sizeof(client_addr);
>
> assert(revents & EPOLLIN);
>
> server_fd = get_server_connection_fd(proxy);
> if (server_fd < 0) {
> log_error("Error initiating server connection.");
> goto fail;
> }
>
> client_fd = accept(fd, (struct sockaddr *) &client_addr, &client_len);
> if (client_fd < 0) {
> log_error("Error accepting client connection.");
> goto fail;
> }
>
> log_info("Client connection accepted with fd=%d", client_fd);
>
> total_clients++;
> log_info("Client successfully connected. Total clients: %u", total_clients);
>
> // Wait for the server socket to be writable before initializing
> // read events for the client socket.
> sd_event_add_io(sd_event_get(s), server_fd, EPOLLOUT, connected_to_server_cb, c_server_to_client, &w_server);
>
> c_server_to_client->origin_fd = server_fd;
> c_server_to_client->destination_fd = client_fd;
> c_server_to_client->w_destination = NULL;
> c_server_to_client->c_destination = c_client_to_server;
>
> c_client_to_server->origin_fd = client_fd;
> c_client_to_server->destination_fd = server_fd;
> c_client_to_server->w_destination = w_server;
> c_client_to_server->c_destination = c_server_to_client;
>
> goto finish;
>
> fail:
> free(c_client_to_server);
> free(c_server_to_client);
> sd_event_source_unref(w_server);
>
> finish:
> return 0;
> }
>
> static int run_main_loop(struct proxy *proxy) {
> int r = EXIT_SUCCESS;
> struct sd_event *e = NULL;
> sd_event_source *w_accept = NULL;
>
> r = sd_event_new(&e);
> if (r < 0)
> goto finish;
>
> r = set_nonblock(proxy->listen_fd);
> if (r < 0)
> goto finish;
>
> log_info("Initializing main listener fd=%d", proxy->listen_fd);
>
> sd_event_add_io(e, proxy->listen_fd, EPOLLIN, accept_cb, proxy, &w_accept);
>
> log_info("Initialized main listener. Entering loop.");
>
> sd_event_loop(e);
>
> finish:
> sd_event_source_unref(w_accept);
> sd_event_unref(e);
>
> return r;
> }
>
> int main(int argc, char *argv[]) {
> struct proxy proxy;
> int n, r;
>
> log_info("Starting up.");
>
> if (argc != 3) {
> fprintf(stderr, "usage: %s hostname service-or-port\n", argv[0]);
> exit(1);
> }
>
> proxy.listen_fd = SD_LISTEN_FDS_START;
> proxy.remote_host = argv[1];
> proxy.remote_service = argv[2];
> proxy.remote_is_inet = true; // @TODO: Support Unix domain sockets.
>
> assert(proxy.remote_host);
> assert(proxy.remote_service);
>
> n = sd_listen_fds(1);
> if (n < 0) {
> log_error("Failed to determine passed sockets: %s", strerror(-n));
> exit(EXIT_FAILURE);
> } else if (n > 1) {
> log_error("Can't listen on more than one socket.");
> exit(EXIT_FAILURE);
> }
>
> log_info("Initializing main loop.");
>
> r = run_main_loop(&proxy);
>
> log_info("Exiting with status %d.", r);
> return r;
> }
More information about the systemd-devel
mailing list