[igt-dev] [PATCH i-g-t 3/7] runner: Use socket communications
Kamil Konieczny
kamil.konieczny at linux.intel.com
Fri Oct 28 13:47:11 UTC 2022
On 2022-10-10 at 17:57:04 +0300, Petri Latvala wrote:
> Set up a socket and pass it to the tests. Dump received communications
> to disk.
>
> When generating results.json, use socket comms if it exists.
>
> Signed-off-by: Petri Latvala <petri.latvala at intel.com>
> Cc: Arkadiusz Hiler <arek at hiler.eu>
Acked-by: Kamil Konieczny <kamil.konieczny at linux.intel.com>
> ---
> runner/executor.c | 354 ++++++++++++++++++++++---
> runner/executor.h | 1 +
> runner/resultgen.c | 631 ++++++++++++++++++++++++++++++++++++++++++++-
> 3 files changed, 947 insertions(+), 39 deletions(-)
>
> diff --git a/runner/executor.c b/runner/executor.c
> index 964d0063..125f08a2 100644
> --- a/runner/executor.c
> +++ b/runner/executor.c
> @@ -12,9 +12,11 @@
> #include <stdlib.h>
> #include <string.h>
> #include <sys/ioctl.h>
> +#include <sys/mman.h>
> #include <sys/select.h>
> #include <sys/poll.h>
> #include <sys/signalfd.h>
> +#include <sys/socket.h>
> #include <sys/stat.h>
> #include <sys/time.h>
> #include <sys/types.h>
> @@ -29,6 +31,7 @@
> #include "igt_taints.h"
> #include "executor.h"
> #include "output_strings.h"
> +#include "runnercomms.h"
>
> #define KMSG_HEADER "[IGT] "
> #define KMSG_WARN 4
> @@ -365,7 +368,7 @@ static char *need_to_abort(const struct settings* settings)
> return NULL;
> }
>
> -static void prune_subtest(struct job_list_entry *entry, char *subtest)
> +static void prune_subtest(struct job_list_entry *entry, const char *subtest)
> {
> char *excl;
>
> @@ -442,11 +445,77 @@ static bool prune_from_journal(struct job_list_entry *entry, int fd)
> return pruned > 0;
> }
>
> +struct prune_comms_data
> +{
> + struct job_list_entry *entry;
> + int pruned;
> + bool got_exit;
> +};
> +
> +static bool prune_handle_subtest_start(const struct runnerpacket *packet,
> + runnerpacket_read_helper helper,
> + void *userdata)
> +{
> + struct prune_comms_data *data = userdata;
> +
> + prune_subtest(data->entry, helper.subteststart.name);
> + data->pruned++;
> +
> + return true;
> +}
> +
> +static bool prune_handle_exit(const struct runnerpacket *packet,
> + runnerpacket_read_helper helper,
> + void *userdata)
> +{
> + struct prune_comms_data *data = userdata;
> +
> + data->got_exit = true;
> +
> + return true;
> +}
> +
> +static bool prune_from_comms(struct job_list_entry *entry, int fd)
> +{
> + struct prune_comms_data data = {
> + .entry = entry,
> + .pruned = 0,
> + .got_exit = false,
> + };
> + struct comms_visitor visitor = {
> + .subtest_start = prune_handle_subtest_start,
> + .exit = prune_handle_exit,
> +
> + .userdata = &data,
> + };
> + size_t old_count = entry->subtest_count;
> +
> + if (comms_read_dump(fd, &visitor) == COMMSPARSE_ERROR)
> + return false;
> +
> + /*
> + * If we know the subtests we originally wanted to run, check
> + * if we got an equal amount already.
> + */
> + if (old_count > 0 && data.pruned >= old_count)
> + entry->binary[0] = '\0';
> +
> + /*
> + * If we don't know how many subtests there should be but we
> + * got an exit, also consider the test fully finished.
> + */
> + if (data.got_exit)
> + entry->binary[0] = '\0';
> +
> + return data.pruned > 0;
> +}
> +
> static const char *filenames[_F_LAST] = {
> [_F_JOURNAL] = "journal.txt",
> [_F_OUT] = "out.txt",
> [_F_ERR] = "err.txt",
> [_F_DMESG] = "dmesg.txt",
> + [_F_SOCKET] = "comms",
> };
>
> static int open_at_end(int dirfd, const char *name)
> @@ -478,6 +547,9 @@ bool open_output_files(int dirfd, int *fds, bool write)
>
> for (i = 0; i < _F_LAST; i++) {
> if ((fds[i] = openfunc(dirfd, filenames[i])) < 0) {
> + /* Ignore failure to open socket comms for reading */
> + if (i == _F_SOCKET && !write) continue;
> +
> while (--i >= 0)
> close(fds[i]);
> return false;
> @@ -758,6 +830,16 @@ static int next_kill_signal(int killed)
> }
> }
>
> +static void write_packet_with_canary(int fd, struct runnerpacket *packet, bool sync)
> +{
> + uint32_t canary = socket_dump_canary();
> +
> + write(fd, &canary, sizeof(canary));
> + write(fd, packet, packet->size);
> + if (sync)
> + fdatasync(fd);
> +}
> +
> /*
> * Returns:
> * =0 - Success
> @@ -765,7 +847,8 @@ static int next_kill_signal(int killed)
> * >0 - Timeout happened, need to recreate from journal
> */
> static int monitor_output(pid_t child,
> - int outfd, int errfd, int kmsgfd, int sigfd,
> + int outfd, int errfd, int socketfd,
> + int kmsgfd, int sigfd,
> int *outputs,
> double *time_spent,
> struct settings *settings,
> @@ -787,12 +870,15 @@ static int monitor_output(pid_t child,
> unsigned long taints = 0;
> bool aborting = false;
> size_t disk_usage = 0;
> + bool socket_comms_used = false; /* whether the test actually uses comms */
>
> igt_gettime(&time_beg);
> time_last_activity = time_last_subtest = time_killed = time_beg;
>
> if (errfd > nfds)
> nfds = errfd;
> + if (socketfd > nfds)
> + nfds = socketfd;
> if (kmsgfd > nfds)
> nfds = kmsgfd;
> if (sigfd > nfds)
> @@ -831,6 +917,8 @@ static int monitor_output(pid_t child,
> FD_SET(outfd, &set);
> if (errfd >= 0)
> FD_SET(errfd, &set);
> + if (socketfd >= 0)
> + FD_SET(socketfd, &set);
> if (kmsgfd >= 0)
> FD_SET(kmsgfd, &set);
> if (sigfd >= 0)
> @@ -963,6 +1051,99 @@ static int monitor_output(pid_t child,
> }
> }
>
> + if (socketfd >= 0 && FD_ISSET(socketfd, &set)) {
> + struct runnerpacket *packet;
> +
> + time_last_activity = time_now;
> +
> + /* Fully drain everything */
> + while (true) {
> + s = recv(socketfd, buf, sizeof(buf), MSG_DONTWAIT);
> +
> + if (s < 0) {
> + if (errno == EAGAIN)
> + break;
> +
> + errf("Error reading from communication socket: %m\n");
> +
> + close(socketfd);
> + socketfd = -1;
> + goto socket_end;
> + }
> +
> + packet = (struct runnerpacket *)buf;
> + if (s < sizeof(*packet) || s != packet->size) {
> + errf("Socket communication error: Received %zd bytes, expected %zd\n",
> + s, s >= sizeof(packet->size) ? packet->size : sizeof(*packet));
> + close(socketfd);
> + socketfd = -1;
> + goto socket_end;
> + }
> +
> + write_packet_with_canary(outputs[_F_SOCKET], packet, settings->sync);
> + disk_usage += packet->size;
> +
> + /*
> + * runner sends EXEC itself before executing
> + * the test, other types indicate the test
> + * really uses socket comms
> + */
> + if (packet->type != PACKETTYPE_EXEC)
> + socket_comms_used = true;
> +
> + if (packet->type == PACKETTYPE_SUBTEST_START ||
> + packet->type == PACKETTYPE_DYNAMIC_SUBTEST_START)
> + time_last_subtest = time_now;
> +
> + if (settings->log_level >= LOG_LEVEL_VERBOSE) {
> + runnerpacket_read_helper helper = {};
> + const char *time;
> +
> + if (packet->type == PACKETTYPE_SUBTEST_START ||
> + packet->type == PACKETTYPE_SUBTEST_RESULT ||
> + packet->type == PACKETTYPE_DYNAMIC_SUBTEST_START ||
> + packet->type == PACKETTYPE_DYNAMIC_SUBTEST_RESULT)
> + helper = read_runnerpacket(packet);
> +
> + switch (helper.type) {
> + case PACKETTYPE_SUBTEST_START:
> + if (helper.subteststart.name)
> + outf("Starting subtest: %s\n", helper.subteststart.name);
> + break;
> + case PACKETTYPE_SUBTEST_RESULT:
> + if (helper.subtestresult.name && helper.subtestresult.result) {
> + time = "<unknown>";
> + if (helper.subtestresult.timeused)
> + time = helper.subtestresult.timeused;
> + outf("Subtest %s: %s (%ss)\n",
> + helper.subtestresult.name,
> + helper.subtestresult.result,
> + time);
> + }
> + break;
> + case PACKETTYPE_DYNAMIC_SUBTEST_START:
> + if (helper.dynamicsubteststart.name)
> + outf("Starting dynamic subtest: %s\n", helper.dynamicsubteststart.name);
> + break;
> + case PACKETTYPE_DYNAMIC_SUBTEST_RESULT:
> + if (helper.dynamicsubtestresult.name && helper.dynamicsubtestresult.result) {
> + time = "<unknown>";
> + if (helper.dynamicsubtestresult.timeused)
> + time = helper.dynamicsubtestresult.timeused;
> + outf("Dynamic subtest %s: %s (%ss)\n",
> + helper.dynamicsubtestresult.name,
> + helper.dynamicsubtestresult.result,
> + time);
> + }
> + break;
> + default:
> + break;
> + }
> + }
> + }
> + }
> + socket_end:
> +
> if (kmsgfd >= 0 && FD_ISSET(kmsgfd, &set)) {
> long dmesgwritten;
>
> @@ -1032,11 +1213,23 @@ static int monitor_output(pid_t child,
> if (settings->log_level >= LOG_LEVEL_NORMAL)
> outf("Exiting gracefully, currently running test will have a 'notrun' result\n");
>
> - dprintf(outputs[_F_JOURNAL], "%s%d (%.3fs)\n",
> - EXECUTOR_EXIT,
> - -SIGHUP, 0.0);
> - if (settings->sync)
> - fdatasync(outputs[_F_JOURNAL]);
> + if (socket_comms_used) {
> + struct runnerpacket *message, *override;
> +
> + message = runnerpacket_log(STDOUT_FILENO, "runner: Exiting gracefully, overriding this test's result to be notrun\n");
> + write_packet_with_canary(outputs[_F_SOCKET], message, false); /* possible sync after the override packet */
> + free(message);
> +
> + override = runnerpacket_resultoverride("notrun");
> + write_packet_with_canary(outputs[_F_SOCKET], override, settings->sync);
> + free(override);
> + } else {
> + dprintf(outputs[_F_JOURNAL], "%s%d (%.3fs)\n",
> + EXECUTOR_EXIT,
> + -SIGHUP, 0.0);
> + if (settings->sync)
> + fdatasync(outputs[_F_JOURNAL]);
> + }
> }
>
> aborting = true;
> @@ -1053,9 +1246,10 @@ static int monitor_output(pid_t child,
> time = 0.0;
>
> if (!aborting) {
> - const char *exitline;
> + bool timeoutresult = false;
>
> - exitline = killed ? EXECUTOR_TIMEOUT : EXECUTOR_EXIT;
> + if (killed)
> + timeoutresult = true;
>
> /* If we're stopping because we killed
> * the test for tainting, let's not
> @@ -1069,7 +1263,7 @@ static int monitor_output(pid_t child,
> * journaling a timeout here.
> */
> if (killed && is_tainted(taints)) {
> - exitline = EXECUTOR_EXIT;
> + timeoutresult = false;
>
> /*
> * Also inject a message to
> @@ -1082,11 +1276,22 @@ static int monitor_output(pid_t child,
> * have newlines on both ends
> * of this injection though.
> */
> - dprintf(outputs[_F_OUT],
> - "\nrunner: This test was killed due to a kernel taint (0x%lx).\n",
> - taints);
> - if (settings->sync)
> - fdatasync(outputs[_F_OUT]);
> + if (socket_comms_used) {
> + struct runnerpacket *message;
> + char killmsg[256];
> +
> + snprintf(killmsg, sizeof(killmsg),
> + "runner: This test was killed due to a kernel taint (0x%lx).\n", taints);
> + message = runnerpacket_log(STDOUT_FILENO, killmsg);
> + write_packet_with_canary(outputs[_F_SOCKET], message, settings->sync);
> + free(message);
> + } else {
> + dprintf(outputs[_F_OUT],
> + "\nrunner: This test was killed due to a kernel taint (0x%lx).\n",
> + taints);
> + if (settings->sync)
> + fdatasync(outputs[_F_OUT]);
> + }
> }
>
> /*
> @@ -1094,21 +1299,58 @@ static int monitor_output(pid_t child,
> * exceeded the disk usage limit.
> */
> if (killed && disk_usage_limit_exceeded(settings, disk_usage)) {
> - exitline = EXECUTOR_EXIT;
> - dprintf(outputs[_F_OUT],
> - "\nrunner: This test was killed due to exceeding disk usage limit. "
> - "(Used %zd bytes, limit %zd)\n",
> - disk_usage,
> - settings->disk_usage_limit);
> - if (settings->sync)
> - fdatasync(outputs[_F_OUT]);
> + timeoutresult = false;
> +
> + if (socket_comms_used) {
> + struct runnerpacket *message;
> + char killmsg[256];
> +
> + snprintf(killmsg, sizeof(killmsg),
> + "runner: This test was killed due to exceeding disk usage limit. "
> + "(Used %zd bytes, limit %zd)\n",
> + disk_usage,
> + settings->disk_usage_limit);
> + message = runnerpacket_log(STDOUT_FILENO, killmsg);
> + write_packet_with_canary(outputs[_F_SOCKET], message, settings->sync);
> + free(message);
> + } else {
> + dprintf(outputs[_F_OUT],
> + "\nrunner: This test was killed due to exceeding disk usage limit. "
> + "(Used %zd bytes, limit %zd)\n",
> + disk_usage,
> + settings->disk_usage_limit);
> + if (settings->sync)
> + fdatasync(outputs[_F_OUT]);
> + }
> }
>
> - dprintf(outputs[_F_JOURNAL], "%s%d (%.3fs)\n",
> - exitline,
> - status, time);
> - if (settings->sync) {
> - fdatasync(outputs[_F_JOURNAL]);
> + if (socket_comms_used) {
> + struct runnerpacket *exitpacket;
> + char timestr[32];
> +
> + snprintf(timestr, sizeof(timestr), "%.3f", time);
> +
> + if (timeoutresult) {
> + struct runnerpacket *override;
> +
> + override = runnerpacket_resultoverride("timeout");
> + write_packet_with_canary(outputs[_F_SOCKET], override, false); /* sync after exitpacket */
> + free(override);
> + }
> +
> + exitpacket = runnerpacket_exit(status, timestr);
> + write_packet_with_canary(outputs[_F_SOCKET], exitpacket, settings->sync);
> + free(exitpacket);
> + } else {
> + const char *exitline;
> +
> + exitline = timeoutresult ? EXECUTOR_TIMEOUT : EXECUTOR_EXIT;
> + dprintf(outputs[_F_JOURNAL], "%s%d (%.3fs)\n",
> + exitline,
> + status, time);
> + if (settings->sync) {
> + fdatasync(outputs[_F_JOURNAL]);
> + }
> }
>
> if (status == IGT_EXIT_ABORT) {
> @@ -1153,6 +1395,7 @@ static int monitor_output(pid_t child,
> free(outbuf);
> close(outfd);
> close(errfd);
> + close(socketfd);
> close(kmsgfd);
> return -1;
> }
> @@ -1176,6 +1419,7 @@ static int monitor_output(pid_t child,
> free(outbuf);
> close(outfd);
> close(errfd);
> + close(socketfd);
> close(kmsgfd);
>
> if (aborting)
> @@ -1185,7 +1429,7 @@ static int monitor_output(pid_t child,
> }
>
> static void __attribute__((noreturn))
> -execute_test_process(int outfd, int errfd,
> +execute_test_process(int outfd, int errfd, int socketfd,
> struct settings *settings,
> struct job_list_entry *entry)
> {
> @@ -1237,6 +1481,13 @@ execute_test_process(int outfd, int errfd,
> }
> }
>
> + if (socketfd >= 0) {
> + struct runnerpacket *packet;
> +
> + packet = runnerpacket_exec(argv);
> + write(socketfd, packet, packet->size);
> + }
> +
> execv(argv[0], argv);
> fprintf(stderr, "Cannot execute %s\n", argv[0]);
> exit(IGT_EXIT_INVALID);
> @@ -1318,7 +1569,8 @@ static int execute_next_entry(struct execute_state *state,
> int kmsgfd;
> int outpipe[2] = { -1, -1 };
> int errpipe[2] = { -1, -1 };
> - int outfd, errfd;
> + int socket[2] = { -1, -1 };
> + int outfd, errfd, socketfd;
> char name[32];
> pid_t child;
> int result;
> @@ -1348,6 +1600,12 @@ static int execute_next_entry(struct execute_state *state,
> goto out_pipe;
> }
>
> + if (socketpair(AF_UNIX, SOCK_DGRAM, 0, socket)) {
> + errf("Error creating sockets: %m\n");
> + result = -1;
> + goto out_pipe;
> + }
> +
> if ((kmsgfd = open("/dev/kmsg", O_RDONLY | O_CLOEXEC | O_NONBLOCK)) < 0) {
> errf("Warning: Cannot open /dev/kmsg\n");
> } else {
> @@ -1388,27 +1646,39 @@ static int execute_next_entry(struct execute_state *state,
> result = -1;
> goto out_kmsgfd;
> } else if (child == 0) {
> + char envstring[16];
> +
> outfd = outpipe[1];
> errfd = errpipe[1];
> + socketfd = socket[1];
> close(outpipe[0]);
> close(errpipe[0]);
> + close(socket[0]);
>
> sigprocmask(SIG_UNBLOCK, sigmask, NULL);
>
> + if (socketfd >= 0 && !getenv("IGT_RUNNER_DISABLE_SOCKET_COMMUNICATION")) {
> + snprintf(envstring, sizeof(envstring), "%d", socketfd);
> + setenv("IGT_RUNNER_SOCKET_FD", envstring, 1);
> + }
> setenv("IGT_SENTINEL_ON_STDERR", "1", 1);
>
> - execute_test_process(outfd, errfd, settings, entry);
> + execute_test_process(outfd, errfd, socketfd, settings, entry);
> /* unreachable */
> }
>
> outfd = outpipe[0];
> errfd = errpipe[0];
> + socketfd = socket[0];
> close(outpipe[1]);
> close(errpipe[1]);
> - outpipe[1] = errpipe[1] = -1;
> + close(socket[1]);
> + outpipe[1] = errpipe[1] = socket[1] = -1;
>
> - result = monitor_output(child, outfd, errfd, kmsgfd, sigfd,
> - outputs, time_spent, settings, abortreason);
> + result = monitor_output(child, outfd, errfd, socketfd,
> + kmsgfd, sigfd,
> + outputs, time_spent, settings,
> + abortreason);
>
> out_kmsgfd:
> close(kmsgfd);
> @@ -1580,6 +1850,22 @@ bool initialize_execute_state_from_resume(int dirfd,
>
> entry = &list->entries[i];
> state->next = i;
> +
> + if ((fd = openat(resdirfd, filenames[_F_SOCKET], O_RDONLY)) >= 0) {
> + if (!prune_from_comms(entry, fd)) {
> + /*
> + * No subtests, or incomplete before the first
> + * subtest. Not suitable to re-run.
> + */
> + state->next = i + 1;
> + } else if (entry->binary[0] == '\0') {
> + /* Full completed */
> + state->next = i + 1;
> + }
> +
> + close (fd);
> + }
> +
> if ((fd = openat(resdirfd, filenames[_F_JOURNAL], O_RDONLY)) >= 0) {
> if (!prune_from_journal(entry, fd)) {
> /*
> diff --git a/runner/executor.h b/runner/executor.h
> index 6c83e649..31f4ac16 100644
> --- a/runner/executor.h
> +++ b/runner/executor.h
> @@ -22,6 +22,7 @@ enum {
> _F_OUT,
> _F_ERR,
> _F_DMESG,
> + _F_SOCKET,
> _F_LAST,
> };
>
> diff --git a/runner/resultgen.c b/runner/resultgen.c
> index 79725ca2..3d753828 100644
> --- a/runner/resultgen.c
> +++ b/runner/resultgen.c
> @@ -1,6 +1,7 @@
> #include <assert.h>
> #include <ctype.h>
> #include <fcntl.h>
> +#include <inttypes.h>
> #include <stdio.h>
> #include <string.h>
> #include <sys/mman.h>
> @@ -12,6 +13,7 @@
>
> #include "igt_aux.h"
> #include "igt_core.h"
> +#include "runnercomms.h"
> #include "resultgen.h"
> #include "settings.h"
> #include "executor.h"
> @@ -147,7 +149,7 @@ static const char *next_line(const char *line, const char *bufend)
> return NULL;
> }
>
> -static void append_line(char **buf, size_t *buflen, char *line)
> +static void append_line(char **buf, size_t *buflen, const char *line)
> {
> size_t linelen = strlen(line);
>
> @@ -1228,6 +1230,606 @@ static void fill_from_journal(int fd,
> fclose(f);
> }
>
> +typedef enum comms_state {
> + STATE_INITIAL = 0,
> + STATE_AFTER_EXEC,
> + STATE_SUBTEST_STARTED,
> + STATE_DYNAMIC_SUBTEST_STARTED,
> + STATE_BETWEEN_DYNAMIC_SUBTESTS,
> + STATE_BETWEEN_SUBTESTS,
> + STATE_EXITED,
> +} comms_state_t;
> +
> +struct comms_context
> +{
> + comms_state_t state;
> +
> + struct json_object *binaryruntimeobj;
> + struct json_object *current_test;
> + struct json_object *current_dynamic_subtest;
> + char *current_subtest_name;
> + char *current_dynamic_subtest_name;
> +
> + char *outbuf, *errbuf;
> + size_t outbuflen, errbuflen;
> + size_t outidx, nextoutidx;
> + size_t erridx, nexterridx;
> + size_t dynoutidx, nextdynoutidx;
> + size_t dynerridx, nextdynerridx;
> +
> + char *igt_version;
> +
> + char *subtestresult;
> + char *dynamicsubtestresult;
> +
> + char *cmdline;
> + int exitcode;
> +
> + struct subtest_list *subtests;
> + struct subtest *subtest;
> + struct results *results;
> + struct job_list_entry *entry;
> + const char *binary;
> +};
> +
> +static void comms_free_context(struct comms_context *context)
> +{
> + free(context->current_subtest_name);
> + free(context->current_dynamic_subtest_name);
> + free(context->outbuf);
> + free(context->errbuf);
> + free(context->igt_version);
> + free(context->subtestresult);
> + free(context->dynamicsubtestresult);
> + free(context->cmdline);
> +}
> +
> +static void comms_inject_subtest_start_log(struct comms_context *context,
> + const char *prefix,
> + const char *subtestname)
> +{
> + char msg[512];
> +
> + snprintf(msg, sizeof(msg), "%s%s\n", prefix, subtestname);
> + append_line(&context->outbuf, &context->outbuflen, msg);
> + append_line(&context->errbuf, &context->errbuflen, msg);
> +}
> +
> +static void comms_inject_subtest_end_log(struct comms_context *context,
> + const char *prefix,
> + const char *subtestname,
> + const char *subtestresult,
> + const char *timeused)
> +{
> + char msg[512];
> +
> + snprintf(msg, sizeof(msg), "%s%s: %s (%ss)\n", prefix, subtestname, subtestresult, timeused);
> + append_line(&context->outbuf, &context->outbuflen, msg);
> + append_line(&context->errbuf, &context->errbuflen, msg);
> +}
> +
> +static void comms_finish_subtest(struct comms_context *context)
> +{
> + json_object_object_add(context->current_test, "out",
> + new_escaped_json_string(context->outbuf + context->outidx, context->outbuflen - context->outidx));
> + json_object_object_add(context->current_test, "err",
> + new_escaped_json_string(context->errbuf + context->outidx, context->errbuflen - context->erridx));
> +
> + if (context->igt_version)
> + add_igt_version(context->current_test, context->igt_version, strlen(context->igt_version));
> +
> + if (context->subtestresult == NULL)
> + context->subtestresult = strdup("incomplete");
> + set_result(context->current_test, context->subtestresult);
> +
> + free(context->subtestresult);
> + context->subtestresult = NULL;
> + context->current_test = NULL;
> +
> + context->outidx = context->nextoutidx;
> + context->erridx = context->nexterridx;
> +}
> +
> +static void comms_finish_dynamic_subtest(struct comms_context *context)
> +{
> + json_object_object_add(context->current_dynamic_subtest, "out",
> + new_escaped_json_string(context->outbuf + context->dynoutidx, context->outbuflen - context->dynoutidx));
> + json_object_object_add(context->current_dynamic_subtest, "err",
> + new_escaped_json_string(context->errbuf + context->dynerridx, context->errbuflen - context->dynerridx));
> +
> + if (context->igt_version)
> + add_igt_version(context->current_dynamic_subtest, context->igt_version, strlen(context->igt_version));
> +
> + if (context->dynamicsubtestresult == NULL)
> + context->dynamicsubtestresult = strdup("incomplete");
> + set_result(context->current_dynamic_subtest, context->dynamicsubtestresult);
> +
> + free(context->dynamicsubtestresult);
> + context->dynamicsubtestresult = NULL;
> + context->current_dynamic_subtest = NULL;
> +
> + context->dynoutidx = context->nextdynoutidx;
> + context->dynerridx = context->nextdynerridx;
> +}
> +
> +static void comms_add_new_subtest(struct comms_context *context,
> + const char *subtestname)
> +{
> + char piglit_name[256];
> +
> + add_subtest(context->subtests, strdup(subtestname));
> + context->subtest = &context->subtests->subs[context->subtests->size - 1];
> + generate_piglit_name(context->binary, subtestname, piglit_name, sizeof(piglit_name));
> + context->current_test = get_or_create_json_object(context->results->tests, piglit_name);
> + free(context->current_subtest_name);
> + context->current_subtest_name = strdup(subtestname);
> +}
> +
> +static void comms_add_new_dynamic_subtest(struct comms_context *context,
> + const char *dynamic_name)
> +{
> + char piglit_name[256];
> + char dynamic_piglit_name[256];
> +
> + add_dynamic_subtest(context->subtest, strdup(dynamic_name));
> + generate_piglit_name(context->binary, context->current_subtest_name, piglit_name, sizeof(piglit_name));
> + generate_piglit_name_for_dynamic(piglit_name, dynamic_name, dynamic_piglit_name, sizeof(dynamic_piglit_name));
> + context->current_dynamic_subtest = get_or_create_json_object(context->results->tests, dynamic_piglit_name);
> + free(context->current_dynamic_subtest_name);
> + context->current_dynamic_subtest_name = strdup(dynamic_name);
> +}
> +
> +static bool comms_handle_log(const struct runnerpacket *packet,
> + runnerpacket_read_helper helper,
> + void *userdata)
> +{
> + struct comms_context *context = userdata;
> + char **textbuf;
> + size_t *textlen;
> +
> + if (helper.log.stream == STDOUT_FILENO) {
> + textbuf = &context->outbuf;
> + textlen = &context->outbuflen;
> + } else {
> + textbuf = &context->errbuf;
> + textlen = &context->errbuflen;
> + }
> + append_line(textbuf, textlen, helper.log.text);
> +
> + return true;
> +}
> +
> +static bool comms_handle_exec(const struct runnerpacket *packet,
> + runnerpacket_read_helper helper,
> + void *userdata)
> +{
> + struct comms_context *context = userdata;
> +
> + switch (context->state) {
> + case STATE_INITIAL:
> + break;
> +
> + case STATE_AFTER_EXEC:
> + /*
> + * Resume after an exec that didn't involve any
> + * subtests. Resumes can only happen for tests with
> + * subtests, so while we might have logs already
> + * collected, we have nowhere to put them. The joblist
> + * doesn't help, because the ordering is up to the
> + * test.
> + */
> + printf("Warning: Need to discard %zd bytes of logs, no subtest data\n", context->outbuflen + context->errbuflen);
> + context->outbuflen = context->errbuflen = 0;
> + context->outidx = context->erridx = 0;
> + context->nextoutidx = context->nexterridx = 0;
> + break;
> +
> + case STATE_SUBTEST_STARTED:
> + case STATE_DYNAMIC_SUBTEST_STARTED:
> + case STATE_BETWEEN_DYNAMIC_SUBTESTS:
> + case STATE_BETWEEN_SUBTESTS:
> + case STATE_EXITED:
> + /* A resume exec, so we're already collecting data. */
> + assert(context->current_test != NULL);
> + comms_finish_subtest(context);
> + break;
> + default:
> + assert(false); /* unreachable */
> + }
> +
> + free(context->cmdline);
> + context->cmdline = strdup(helper.exec.cmdline);
> +
> + context->state = STATE_AFTER_EXEC;
> +
> + return true;
> +}
> +
> +static bool comms_handle_exit(const struct runnerpacket *packet,
> + runnerpacket_read_helper helper,
> + void *userdata)
> +{
> + struct comms_context *context = userdata;
> + char piglit_name[256];
> +
> + if (context->state == STATE_AFTER_EXEC) {
> + /*
> + * Exit after exec, so we didn't get any
> + * subtests. Check if there's supposed to be any,
> + * otherwise stuff logs into the binary's result.
> + */
> +
> + char *subtestname = NULL;
> +
> + if (context->entry->subtest_count > 0) {
> + subtestname = context->entry->subtests[0];
> + add_subtest(context->subtests, strdup(subtestname));
> + }
> + generate_piglit_name(context->binary, subtestname, piglit_name, sizeof(piglit_name));
> + context->current_test = get_or_create_json_object(context->results->tests, piglit_name);
> +
> + /* Get result from exitcode unless we have an override already */
> + if (context->subtestresult == NULL)
> + context->subtestresult = strdup(result_from_exitcode(helper.exit.exitcode));
> + } else if (helper.exit.exitcode == IGT_EXIT_ABORT || helper.exit.exitcode == GRACEFUL_EXITCODE) {
> + /*
> + * If we did get subtests, we need to assign the
> + * special exitcode results to the last subtest,
> + * normal and dynamic
> + */
> + const char *result = helper.exit.exitcode == IGT_EXIT_ABORT ? "abort" : "notrun";
> +
> + free(context->subtestresult);
> + context->subtestresult = strdup(result);
> + free(context->dynamicsubtestresult);
> + context->dynamicsubtestresult = strdup(result);
> + }
> +
> + context->exitcode = helper.exit.exitcode;
> + add_runtime(context->binaryruntimeobj, strtod(helper.exit.timeused, NULL));
> +
> + context->state = STATE_EXITED;
> +
> + return true;
> +}
> +
> +static bool comms_handle_subtest_start(const struct runnerpacket *packet,
> + runnerpacket_read_helper helper,
> + void *userdata)
> +{
> + struct comms_context *context = userdata;
> + char errmsg[512];
> +
> + switch (context->state) {
> + case STATE_INITIAL:
> + case STATE_EXITED:
> + /* Subtest starts when we're not even running? (Before exec or after exit) */
> + fprintf(stderr, "Error: Unexpected subtest start (binary wasn't running)\n");
> + return false;
> + case STATE_SUBTEST_STARTED:
> + case STATE_DYNAMIC_SUBTEST_STARTED:
> + case STATE_BETWEEN_DYNAMIC_SUBTESTS:
> + /*
> + * Subtest starts when the previous one was still
> + * running. Text-based parsing would figure that a
> + * resume happened, but we know the real deal with
> + * socket comms.
> + */
> + snprintf(errmsg, sizeof(errmsg),
> + "\nrunner: Subtest %s already running when subtest %s starts. This is a test bug.\n",
> + context->current_subtest_name,
> + helper.subteststart.name);
> + append_line(&context->errbuf, &context->errbuflen, errmsg);
> +
> + if (context->state == STATE_DYNAMIC_SUBTEST_STARTED ||
> + context->state == STATE_BETWEEN_DYNAMIC_SUBTESTS)
> + comms_finish_dynamic_subtest(context);
> +
> + /* fallthrough */
> + case STATE_BETWEEN_SUBTESTS:
> + /* Already collecting for a subtest, finish it up */
> + if (context->current_dynamic_subtest)
> + comms_finish_dynamic_subtest(context);
> +
> + comms_finish_subtest(context);
> +
> + /* fallthrough */
> + case STATE_AFTER_EXEC:
> + comms_add_new_subtest(context, helper.subteststart.name);
> +
> + /* Subtest starting message is not in logs with socket comms, inject it manually */
> + comms_inject_subtest_start_log(context, STARTING_SUBTEST, helper.subteststart.name);
> +
> + break;
> + default:
> + assert(false); /* unreachable */
> + }
> +
> + context->state = STATE_SUBTEST_STARTED;
> +
> + return true;
> +}
> +
> +static bool comms_handle_subtest_result(const struct runnerpacket *packet,
> + runnerpacket_read_helper helper,
> + void *userdata)
> +{
> + struct comms_context *context = userdata;
> + char errmsg[512];
> +
> + switch (context->state) {
> + case STATE_INITIAL:
> + case STATE_EXITED:
> + /* Subtest result when we're not even running? (Before exec or after exit) */
> + fprintf(stderr, "Error: Unexpected subtest result (binary wasn't running)\n");
> + return false;
> + case STATE_DYNAMIC_SUBTEST_STARTED:
> + /*
> + * Subtest result when dynamic subtest is still
> + * running. Text-based parsing would consider that an
> + * incomplete, we're able to inject a warning.
> + */
> + snprintf(errmsg, sizeof(errmsg),
> + "\nrunner: Dynamic subtest %s still running when subtest %s ended. This is a test bug.\n",
> + context->current_dynamic_subtest_name,
> + helper.subtestresult.name);
> + append_line(&context->errbuf, &context->errbuflen, errmsg);
> + comms_finish_dynamic_subtest(context);
> + break;
> + case STATE_BETWEEN_SUBTESTS:
> + /* Subtest result without starting it, and we're already collecting logs for a previous test */
> + comms_finish_subtest(context);
> + comms_add_new_subtest(context, helper.subtestresult.name);
> + break;
> + case STATE_AFTER_EXEC:
> + /* Subtest result without starting it, so comes from a fixture. We're not yet collecting logs for anything. */
> + comms_add_new_subtest(context, helper.subtestresult.name);
> + break;
> + case STATE_SUBTEST_STARTED:
> + case STATE_BETWEEN_DYNAMIC_SUBTESTS:
> + /* Normal flow */
> + break;
> + default:
> + assert(false); /* unreachable */
> + }
> +
> + comms_inject_subtest_end_log(context,
> + SUBTEST_RESULT,
> + helper.subtestresult.name,
> + helper.subtestresult.result,
> + helper.subtestresult.timeused);
> +
> + /* Next subtest, if any, will begin its logs right after that result line */
> + context->nextoutidx = context->outbuflen;
> + context->nexterridx = context->errbuflen;
> +
> + /*
> + * Only store the actual result from the packet if we don't
> + * already have one. If we do, it's from an override.
> + */
> + if (context->subtestresult == NULL) {
> + const char *mappedresult;
> +
> + parse_result_string(helper.subtestresult.result,
> + strlen(helper.subtestresult.result),
> + &mappedresult, NULL);
> + context->subtestresult = strdup(mappedresult);
> + }
> +
> + context->state = STATE_BETWEEN_SUBTESTS;
> +
> + return true;
> +}
> +
> +static bool comms_handle_dynamic_subtest_start(const struct runnerpacket *packet,
> + runnerpacket_read_helper helper,
> + void *userdata)
> +{
> + struct comms_context *context = userdata;
> + char errmsg[512];
> +
> + switch (context->state) {
> + case STATE_INITIAL:
> + case STATE_EXITED:
> + /* Dynamic subtest starts when we're not even running? (Before exec or after exit) */
> + fprintf(stderr, "Error: Unexpected dynamic subtest start (binary wasn't running)\n");
> + return false;
> + case STATE_AFTER_EXEC:
> + /* Binary was running but a subtest wasn't. We don't know where to inject an error message. */
> + fprintf(stderr, "Error: Unexpected dynamic subtest start (subtest wasn't running)\n");
> + return false;
> + case STATE_BETWEEN_SUBTESTS:
> + /*
> + * Dynamic subtest starts when a subtest is not
> + * running. We can't know which subtest this dynamic
> + * subtest was supposed to be in. But we can inject a
> + * warn into the previous subtest.
> + */
> + snprintf(errmsg, sizeof(errmsg),
> + "\nrunner: Dynamic subtest %s started when not inside a subtest. This is a test bug.\n",
> + helper.dynamicsubteststart.name);
> + append_line(&context->errbuf, &context->errbuflen, errmsg);
> +
> + /* Leave the state as is and hope for the best */
> + return true;
> + case STATE_DYNAMIC_SUBTEST_STARTED:
> + snprintf(errmsg, sizeof(errmsg),
> + "\nrunner: Dynamic subtest %s already running when dynamic subtest %s starts. This is a test bug.\n",
> + context->current_dynamic_subtest_name,
> + helper.dynamicsubteststart.name);
> + append_line(&context->errbuf, &context->errbuflen, errmsg);
> +
> + /* fallthrough */
> + case STATE_BETWEEN_DYNAMIC_SUBTESTS:
> + comms_finish_dynamic_subtest(context);
> + /* fallthrough */
> + case STATE_SUBTEST_STARTED:
> + comms_add_new_dynamic_subtest(context, helper.dynamicsubteststart.name);
> +
> + /* Dynamic subtest starting message is not in logs with socket comms, inject it manually */
> + comms_inject_subtest_start_log(context, STARTING_DYNAMIC_SUBTEST, helper.dynamicsubteststart.name);
> +
> + break;
> + default:
> + assert(false); /* unreachable */
> + }
> +
> + context->state = STATE_DYNAMIC_SUBTEST_STARTED;
> +
> + return true;
> +}
> +
> +static bool comms_handle_dynamic_subtest_result(const struct runnerpacket *packet,
> + runnerpacket_read_helper helper,
> + void *userdata)
> +{
> + struct comms_context *context = userdata;
> + char errmsg[512];
> +
> + switch (context->state) {
> + case STATE_INITIAL:
> + case STATE_EXITED:
> + /* Dynamic subtest result when we're not even running? (Before exec or after exit) */
> + fprintf(stderr, "Error: Unexpected dynamic subtest result (binary wasn't running)\n");
> + return false;
> + case STATE_AFTER_EXEC:
> + /* Binary was running but a subtest wasn't. We don't know where to inject an error message. */
> + fprintf(stderr, "Error: Unexpected dynamic subtest result (subtest wasn't running)\n");
> + return false;
> + case STATE_BETWEEN_SUBTESTS:
> + /*
> + * Dynamic subtest result when a subtest is not
> + * running. We can't know which subtest this dynamic
> + * subtest was supposed to be in. But we can inject a
> + * warn into the previous subtest.
> + */
> + snprintf(errmsg, sizeof(errmsg),
> + "\nrunner: Dynamic subtest %s result when not inside a subtest. This is a test bug.\n",
> + helper.dynamicsubtestresult.name);
> + append_line(&context->errbuf, &context->errbuflen, errmsg);
> +
> + /* Leave the state as is and hope for the best */
> + return true;
> + case STATE_BETWEEN_DYNAMIC_SUBTESTS:
> + /*
> + * Result without starting. There's no
> + * skip_subtests_henceforth equivalent for dynamic
> + * subtests so this shouldn't happen, but we can
> + * handle it nevertheless.
> + */
> + comms_finish_dynamic_subtest(context);
> + /* fallthrough */
> + case STATE_SUBTEST_STARTED:
> + /* Result without starting, but we aren't collecting for a dynamic subtest yet */
> + comms_add_new_dynamic_subtest(context, helper.dynamicsubtestresult.name);
> + break;
> + case STATE_DYNAMIC_SUBTEST_STARTED:
> + /* Normal flow */
> + break;
> + default:
> + assert(false); /* unreachable */
> + }
> +
> + comms_inject_subtest_end_log(context,
> + DYNAMIC_SUBTEST_RESULT,
> + helper.dynamicsubtestresult.name,
> + helper.dynamicsubtestresult.result,
> + helper.dynamicsubtestresult.timeused);
> +
> + /* Next dynamic subtest, if any, will begin its logs right after that result line */
> + context->nextdynoutidx = context->outbuflen;
> + context->nextdynerridx = context->errbuflen;
> +
> + /*
> + * Only store the actual result from the packet if we don't
> + * already have one. If we do, it's from an override.
> + */
> + if (context->dynamicsubtestresult == NULL) {
> + const char *mappedresult;
> +
> + parse_result_string(helper.dynamicsubtestresult.result,
> + strlen(helper.dynamicsubtestresult.result),
> + &mappedresult, NULL);
> + context->dynamicsubtestresult = strdup(mappedresult);
> + }
> +
> + context->state = STATE_BETWEEN_DYNAMIC_SUBTESTS;
> +
> + return true;
> +}
> +
> +static bool comms_handle_versionstring(const struct runnerpacket *packet,
> + runnerpacket_read_helper helper,
> + void *userdata)
> +{
> + struct comms_context *context = userdata;
> +
> + free(context->igt_version);
> + context->igt_version = strdup(helper.versionstring.text);
> +
> + return true;
> +}
> +
> +static bool comms_handle_result_override(const struct runnerpacket *packet,
> + runnerpacket_read_helper helper,
> + void *userdata)
> +{
> + struct comms_context *context = userdata;
> +
> + if (context->current_dynamic_subtest) {
> + free(context->dynamicsubtestresult);
> + context->dynamicsubtestresult = strdup(helper.resultoverride.result);
> + }
> +
> + free(context->subtestresult);
> + context->subtestresult = strdup(helper.resultoverride.result);
> +
> + return true;
> +}
> +
> +static int fill_from_comms(int fd,
> + struct job_list_entry *entry,
> + struct subtest_list *subtests,
> + struct results *results)
> +{
> + struct comms_context context = {};
> + struct comms_visitor visitor = {
> + .log = comms_handle_log,
> + .exec = comms_handle_exec,
> + .exit = comms_handle_exit,
> + .subtest_start = comms_handle_subtest_start,
> + .subtest_result = comms_handle_subtest_result,
> + .dynamic_subtest_start = comms_handle_dynamic_subtest_start,
> + .dynamic_subtest_result = comms_handle_dynamic_subtest_result,
> + .versionstring = comms_handle_versionstring,
> + .result_override = comms_handle_result_override,
> +
> + .userdata = &context,
> + };
> + char piglit_name[256];
> + int ret = COMMSPARSE_EMPTY;
> +
> + if (fd < 0)
> + return COMMSPARSE_EMPTY;
> +
> + context.entry = entry;
> + context.binary = entry->binary;
> + generate_piglit_name(entry->binary, NULL, piglit_name, sizeof(piglit_name));
> + context.binaryruntimeobj = get_or_create_json_object(results->runtimes, piglit_name);
> + context.results = results;
> + context.subtests = subtests;
> +
> + ret = comms_read_dump(fd, &visitor);
> +
> + if (context.current_dynamic_subtest != NULL)
> + comms_finish_dynamic_subtest(&context);
> + if (context.current_test != NULL)
> + comms_finish_subtest(&context);
> + comms_free_context(&context);
> +
> + return ret;
> +}
> +
> static bool result_is_requested(struct job_list_entry *entry,
> const char *subtestname,
> const char *dynamic_name)
> @@ -1505,6 +2107,7 @@ static bool parse_test_directory(int dirfd,
> int fds[_F_LAST];
> struct subtest_list subtests = {};
> bool status = true;
> + int commsparsed;
>
> if (!open_output_files(dirfd, fds, false)) {
> fprintf(stderr, "Error opening output files\n");
> @@ -1517,10 +2120,28 @@ static bool parse_test_directory(int dirfd,
> */
> fill_from_journal(fds[_F_JOURNAL], entry, &subtests, results);
>
> - if (!fill_from_output(fds[_F_OUT], entry->binary, "out", &subtests, results->tests) ||
> - !fill_from_output(fds[_F_ERR], entry->binary, "err", &subtests, results->tests) ||
> - !fill_from_dmesg(fds[_F_DMESG], settings, entry->binary, &subtests, results->tests)) {
> - fprintf(stderr, "Error parsing output files\n");
> + /*
> + * Get test output from socket comms if it exists, otherwise
> + * parse stdout/stderr
> + */
> + commsparsed = fill_from_comms(fds[_F_SOCKET], entry, &subtests, results);
> + if (commsparsed == COMMSPARSE_ERROR) {
> + fprintf(stderr, "Error parsing output files (comms)\n");
> + status = false;
> + goto parse_output_end;
> + }
> +
> + if (commsparsed == COMMSPARSE_EMPTY) {
> + if (!fill_from_output(fds[_F_OUT], entry->binary, "out", &subtests, results->tests) ||
> + !fill_from_output(fds[_F_ERR], entry->binary, "err", &subtests, results->tests)) {
> + fprintf(stderr, "Error parsing output files (out.txt, err.txt)\n");
> + status = false;
> + goto parse_output_end;
> + }
> + }
> +
> + if (!fill_from_dmesg(fds[_F_DMESG], settings, entry->binary, &subtests, results->tests)) {
> + fprintf(stderr, "Error parsing output files (dmesg.txt)\n");
> status = false;
> goto parse_output_end;
> }
> --
> 2.30.2
>
More information about the igt-dev
mailing list