[igt-dev] [PATCH i-g-t v2 3/7] runner: Use socket communications

Petri Latvala petri.latvala at intel.com
Mon Oct 31 10:24:30 UTC 2022


Set up a socket and pass it to the tests. Dump received communications
to disk.

When generating results.json, use socket comms if it exists.

Signed-off-by: Petri Latvala <petri.latvala at intel.com>
Cc: Arkadiusz Hiler <arek at hiler.eu>
Acked-by: Kamil Konieczny <kamil.konieczny at linux.intel.com>
---
 runner/executor.c  | 354 ++++++++++++++++++++++---
 runner/executor.h  |   1 +
 runner/resultgen.c | 631 ++++++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 947 insertions(+), 39 deletions(-)

diff --git a/runner/executor.c b/runner/executor.c
index 8a32fedd..669e8524 100644
--- a/runner/executor.c
+++ b/runner/executor.c
@@ -14,9 +14,11 @@
 #include <stdlib.h>
 #include <string.h>
 #include <sys/ioctl.h>
+#include <sys/mman.h>
 #include <sys/select.h>
 #include <sys/poll.h>
 #include <sys/signalfd.h>
+#include <sys/socket.h>
 #include <sys/stat.h>
 #include <sys/time.h>
 #include <sys/types.h>
@@ -31,6 +33,7 @@
 #include "igt_taints.h"
 #include "executor.h"
 #include "output_strings.h"
+#include "runnercomms.h"
 
 #define KMSG_HEADER "[IGT] "
 #define KMSG_WARN 4
@@ -367,7 +370,7 @@ static char *need_to_abort(const struct settings* settings)
 	return NULL;
 }
 
-static void prune_subtest(struct job_list_entry *entry, char *subtest)
+static void prune_subtest(struct job_list_entry *entry, const char *subtest)
 {
 	char *excl;
 
@@ -444,11 +447,77 @@ static bool prune_from_journal(struct job_list_entry *entry, int fd)
 	return pruned > 0;
 }
 
+struct prune_comms_data
+{
+	struct job_list_entry *entry;
+	int pruned;
+	bool got_exit;
+};
+
+static bool prune_handle_subtest_start(const struct runnerpacket *packet,
+				       runnerpacket_read_helper helper,
+				       void *userdata)
+{
+	struct prune_comms_data *data = userdata;
+
+	prune_subtest(data->entry, helper.subteststart.name);
+	data->pruned++;
+
+	return true;
+}
+
+static bool prune_handle_exit(const struct runnerpacket *packet,
+			      runnerpacket_read_helper helper,
+			      void *userdata)
+{
+	struct prune_comms_data *data = userdata;
+
+	data->got_exit = true;
+
+	return true;
+}
+
+static bool prune_from_comms(struct job_list_entry *entry, int fd)
+{
+	struct prune_comms_data data = {
+		.entry = entry,
+		.pruned = 0,
+		.got_exit = false,
+	};
+	struct comms_visitor visitor = {
+		.subtest_start = prune_handle_subtest_start,
+		.exit = prune_handle_exit,
+
+		.userdata = &data,
+	};
+	size_t old_count = entry->subtest_count;
+
+	if (comms_read_dump(fd, &visitor) == COMMSPARSE_ERROR)
+		return false;
+
+	/*
+	 * If we know the subtests we originally wanted to run, check
+	 * if we got an equal amount already.
+	 */
+	if (old_count > 0 && data.pruned >= old_count)
+		entry->binary[0] = '\0';
+
+	/*
+	 * If we don't know how many subtests there should be but we
+	 * got an exit, also consider the test fully finished.
+	 */
+	if (data.got_exit)
+		entry->binary[0] = '\0';
+
+	return data.pruned > 0;
+}
+
 static const char *filenames[_F_LAST] = {
 	[_F_JOURNAL] = "journal.txt",
 	[_F_OUT] = "out.txt",
 	[_F_ERR] = "err.txt",
 	[_F_DMESG] = "dmesg.txt",
+	[_F_SOCKET] = "comms",
 };
 
 static int open_at_end(int dirfd, const char *name)
@@ -480,6 +549,9 @@ bool open_output_files(int dirfd, int *fds, bool write)
 
 	for (i = 0; i < _F_LAST; i++) {
 		if ((fds[i] = openfunc(dirfd, filenames[i])) < 0) {
+			/* Ignore failure to open socket comms for reading */
+			if (i == _F_SOCKET && !write) continue;
+
 			while (--i >= 0)
 				close(fds[i]);
 			return false;
@@ -760,6 +832,16 @@ static int next_kill_signal(int killed)
 	}
 }
 
+static void write_packet_with_canary(int fd, struct runnerpacket *packet, bool sync)
+{
+	uint32_t canary = socket_dump_canary();
+
+	write(fd, &canary, sizeof(canary));
+	write(fd, packet, packet->size);
+	if (sync)
+		fdatasync(fd);
+}
+
 /*
  * Returns:
  *  =0 - Success
@@ -767,7 +849,8 @@ static int next_kill_signal(int killed)
  *  >0 - Timeout happened, need to recreate from journal
  */
 static int monitor_output(pid_t child,
-			  int outfd, int errfd, int kmsgfd, int sigfd,
+			  int outfd, int errfd, int socketfd,
+			  int kmsgfd, int sigfd,
 			  int *outputs,
 			  double *time_spent,
 			  struct settings *settings,
@@ -789,12 +872,15 @@ static int monitor_output(pid_t child,
 	unsigned long taints = 0;
 	bool aborting = false;
 	size_t disk_usage = 0;
+	bool socket_comms_used = false; /* whether the test actually uses comms */
 
 	igt_gettime(&time_beg);
 	time_last_activity = time_last_subtest = time_killed = time_beg;
 
 	if (errfd > nfds)
 		nfds = errfd;
+	if (socketfd > nfds)
+		nfds = socketfd;
 	if (kmsgfd > nfds)
 		nfds = kmsgfd;
 	if (sigfd > nfds)
@@ -833,6 +919,8 @@ static int monitor_output(pid_t child,
 			FD_SET(outfd, &set);
 		if (errfd >= 0)
 			FD_SET(errfd, &set);
+		if (socketfd >= 0)
+			FD_SET(socketfd, &set);
 		if (kmsgfd >= 0)
 			FD_SET(kmsgfd, &set);
 		if (sigfd >= 0)
@@ -965,6 +1053,99 @@ static int monitor_output(pid_t child,
 			}
 		}
 
+		if (socketfd >= 0 && FD_ISSET(socketfd, &set)) {
+			struct runnerpacket *packet;
+
+			time_last_activity = time_now;
+
+			/* Fully drain everything */
+			while (true) {
+				s = recv(socketfd, buf, sizeof(buf), MSG_DONTWAIT);
+
+				if (s < 0) {
+					if (errno == EAGAIN)
+						break;
+
+					errf("Error reading from communication socket: %m\n");
+
+					close(socketfd);
+					socketfd = -1;
+					goto socket_end;
+				}
+
+				packet = (struct runnerpacket *)buf;
+				if (s < sizeof(*packet) || s != packet->size) {
+					errf("Socket communication error: Received %zd bytes, expected %zd\n",
+					     s, s >= sizeof(packet->size) ? packet->size : sizeof(*packet));
+					close(socketfd);
+					socketfd = -1;
+					goto socket_end;
+				}
+
+				write_packet_with_canary(outputs[_F_SOCKET], packet, settings->sync);
+				disk_usage += packet->size;
+
+				/*
+				 * runner sends EXEC itself before executing
+				 * the test, other types indicate the test
+				 * really uses socket comms
+				 */
+				if (packet->type != PACKETTYPE_EXEC)
+					socket_comms_used = true;
+
+				if (packet->type == PACKETTYPE_SUBTEST_START ||
+				    packet->type == PACKETTYPE_DYNAMIC_SUBTEST_START)
+					time_last_subtest = time_now;
+
+				if (settings->log_level >= LOG_LEVEL_VERBOSE) {
+					runnerpacket_read_helper helper = {};
+					const char *time;
+
+					if (packet->type == PACKETTYPE_SUBTEST_START ||
+					    packet->type == PACKETTYPE_SUBTEST_RESULT ||
+					    packet->type == PACKETTYPE_DYNAMIC_SUBTEST_START ||
+					    packet->type == PACKETTYPE_DYNAMIC_SUBTEST_RESULT)
+						helper = read_runnerpacket(packet);
+
+					switch (helper.type) {
+					case PACKETTYPE_SUBTEST_START:
+						if (helper.subteststart.name)
+							outf("Starting subtest: %s\n", helper.subteststart.name);
+						break;
+					case PACKETTYPE_SUBTEST_RESULT:
+						if (helper.subtestresult.name && helper.subtestresult.result) {
+							time = "<unknown>";
+							if (helper.subtestresult.timeused)
+								time = helper.subtestresult.timeused;
+							outf("Subtest %s: %s (%ss)\n",
+							     helper.subtestresult.name,
+							     helper.subtestresult.result,
+							     time);
+						}
+						break;
+					case PACKETTYPE_DYNAMIC_SUBTEST_START:
+						if (helper.dynamicsubteststart.name)
+							outf("Starting dynamic subtest: %s\n", helper.dynamicsubteststart.name);
+						break;
+					case PACKETTYPE_DYNAMIC_SUBTEST_RESULT:
+						if (helper.dynamicsubtestresult.name && helper.dynamicsubtestresult.result) {
+							time = "<unknown>";
+							if (helper.dynamicsubtestresult.timeused)
+								time = helper.dynamicsubtestresult.timeused;
+							outf("Dynamic subtest %s: %s (%ss)\n",
+							     helper.dynamicsubtestresult.name,
+							     helper.dynamicsubtestresult.result,
+							     time);
+						}
+						break;
+					default:
+						break;
+					}
+				}
+			}
+		}
+	socket_end:
+
 		if (kmsgfd >= 0 && FD_ISSET(kmsgfd, &set)) {
 			long dmesgwritten;
 
@@ -1034,11 +1215,23 @@ static int monitor_output(pid_t child,
 					if (settings->log_level >= LOG_LEVEL_NORMAL)
 						outf("Exiting gracefully, currently running test will have a 'notrun' result\n");
 
-					dprintf(outputs[_F_JOURNAL], "%s%d (%.3fs)\n",
-						EXECUTOR_EXIT,
-						-SIGHUP, 0.0);
-					if (settings->sync)
-						fdatasync(outputs[_F_JOURNAL]);
+					if (socket_comms_used) {
+						struct runnerpacket *message, *override;
+
+						message = runnerpacket_log(STDOUT_FILENO, "runner: Exiting gracefully, overriding this test's result to be notrun\n");
+						write_packet_with_canary(outputs[_F_SOCKET], message, false); /* possible sync after the override packet */
+						free(message);
+
+						override = runnerpacket_resultoverride("notrun");
+						write_packet_with_canary(outputs[_F_SOCKET], override, settings->sync);
+						free(override);
+					} else {
+						dprintf(outputs[_F_JOURNAL], "%s%d (%.3fs)\n",
+							EXECUTOR_EXIT,
+							-SIGHUP, 0.0);
+						if (settings->sync)
+							fdatasync(outputs[_F_JOURNAL]);
+					}
 				}
 
 				aborting = true;
@@ -1055,9 +1248,10 @@ static int monitor_output(pid_t child,
 				time = 0.0;
 
 			if (!aborting) {
-				const char *exitline;
+				bool timeoutresult = false;
 
-				exitline = killed ? EXECUTOR_TIMEOUT : EXECUTOR_EXIT;
+				if (killed)
+					timeoutresult = true;
 
 				/* If we're stopping because we killed
 				 * the test for tainting, let's not
@@ -1071,7 +1265,7 @@ static int monitor_output(pid_t child,
 				 * journaling a timeout here.
 				 */
 				if (killed && is_tainted(taints)) {
-					exitline = EXECUTOR_EXIT;
+					timeoutresult = false;
 
 					/*
 					 * Also inject a message to
@@ -1084,11 +1278,22 @@ static int monitor_output(pid_t child,
 					 * have newlines on both ends
 					 * of this injection though.
 					 */
-					dprintf(outputs[_F_OUT],
-						"\nrunner: This test was killed due to a kernel taint (0x%lx).\n",
-						taints);
-					if (settings->sync)
-						fdatasync(outputs[_F_OUT]);
+					if (socket_comms_used) {
+						struct runnerpacket *message;
+						char killmsg[256];
+
+						snprintf(killmsg, sizeof(killmsg),
+							 "runner: This test was killed due to a kernel taint (0x%lx).\n", taints);
+						message = runnerpacket_log(STDOUT_FILENO, killmsg);
+						write_packet_with_canary(outputs[_F_SOCKET], message, settings->sync);
+						free(message);
+					} else {
+						dprintf(outputs[_F_OUT],
+							"\nrunner: This test was killed due to a kernel taint (0x%lx).\n",
+							taints);
+						if (settings->sync)
+							fdatasync(outputs[_F_OUT]);
+					}
 				}
 
 				/*
@@ -1096,21 +1301,58 @@ static int monitor_output(pid_t child,
 				 * exceeded the disk usage limit.
 				 */
 				if (killed && disk_usage_limit_exceeded(settings, disk_usage)) {
-					exitline = EXECUTOR_EXIT;
-					dprintf(outputs[_F_OUT],
-						"\nrunner: This test was killed due to exceeding disk usage limit. "
-						"(Used %zd bytes, limit %zd)\n",
-						disk_usage,
-						settings->disk_usage_limit);
-					if (settings->sync)
-						fdatasync(outputs[_F_OUT]);
+					timeoutresult = false;
+
+					if (socket_comms_used) {
+						struct runnerpacket *message;
+						char killmsg[256];
+
+						snprintf(killmsg, sizeof(killmsg),
+							 "runner: This test was killed due to exceeding disk usage limit. "
+							 "(Used %zd bytes, limit %zd)\n",
+							 disk_usage,
+							 settings->disk_usage_limit);
+						message = runnerpacket_log(STDOUT_FILENO, killmsg);
+						write_packet_with_canary(outputs[_F_SOCKET], message, settings->sync);
+						free(message);
+					} else {
+						dprintf(outputs[_F_OUT],
+							"\nrunner: This test was killed due to exceeding disk usage limit. "
+							"(Used %zd bytes, limit %zd)\n",
+							disk_usage,
+							settings->disk_usage_limit);
+						if (settings->sync)
+							fdatasync(outputs[_F_OUT]);
+					}
 				}
 
-				dprintf(outputs[_F_JOURNAL], "%s%d (%.3fs)\n",
-					exitline,
-					status, time);
-				if (settings->sync) {
-					fdatasync(outputs[_F_JOURNAL]);
+				if (socket_comms_used) {
+					struct runnerpacket *exitpacket;
+					char timestr[32];
+
+					snprintf(timestr, sizeof(timestr), "%.3f", time);
+
+					if (timeoutresult) {
+						struct runnerpacket *override;
+
+						override = runnerpacket_resultoverride("timeout");
+						write_packet_with_canary(outputs[_F_SOCKET], override, false); /* sync after exitpacket */
+						free(override);
+					}
+
+					exitpacket = runnerpacket_exit(status, timestr);
+					write_packet_with_canary(outputs[_F_SOCKET], exitpacket, settings->sync);
+					free(exitpacket);
+				} else {
+					const char *exitline;
+
+					exitline = timeoutresult ? EXECUTOR_TIMEOUT : EXECUTOR_EXIT;
+					dprintf(outputs[_F_JOURNAL], "%s%d (%.3fs)\n",
+						exitline,
+						status, time);
+					if (settings->sync) {
+						fdatasync(outputs[_F_JOURNAL]);
+					}
 				}
 
 				if (status == IGT_EXIT_ABORT) {
@@ -1155,6 +1397,7 @@ static int monitor_output(pid_t child,
 				free(outbuf);
 				close(outfd);
 				close(errfd);
+				close(socketfd);
 				close(kmsgfd);
 				return -1;
 			}
@@ -1178,6 +1421,7 @@ static int monitor_output(pid_t child,
 	free(outbuf);
 	close(outfd);
 	close(errfd);
+	close(socketfd);
 	close(kmsgfd);
 
 	if (aborting)
@@ -1187,7 +1431,7 @@ static int monitor_output(pid_t child,
 }
 
 static void __attribute__((noreturn))
-execute_test_process(int outfd, int errfd,
+execute_test_process(int outfd, int errfd, int socketfd,
 		     struct settings *settings,
 		     struct job_list_entry *entry)
 {
@@ -1239,6 +1483,13 @@ execute_test_process(int outfd, int errfd,
 		}
 	}
 
+	if (socketfd >= 0) {
+		struct runnerpacket *packet;
+
+		packet = runnerpacket_exec(argv);
+		write(socketfd, packet, packet->size);
+	}
+
 	execv(argv[0], argv);
 	fprintf(stderr, "Cannot execute %s\n", argv[0]);
 	exit(IGT_EXIT_INVALID);
@@ -1320,7 +1571,8 @@ static int execute_next_entry(struct execute_state *state,
 	int kmsgfd;
 	int outpipe[2] = { -1, -1 };
 	int errpipe[2] = { -1, -1 };
-	int outfd, errfd;
+	int socket[2] = { -1, -1 };
+	int outfd, errfd, socketfd;
 	char name[32];
 	pid_t child;
 	int result;
@@ -1350,6 +1602,12 @@ static int execute_next_entry(struct execute_state *state,
 		goto out_pipe;
 	}
 
+	if (socketpair(AF_UNIX, SOCK_DGRAM, 0, socket)) {
+		errf("Error creating sockets: %m\n");
+		result = -1;
+		goto out_pipe;
+	}
+
 	if ((kmsgfd = open("/dev/kmsg", O_RDONLY | O_CLOEXEC | O_NONBLOCK)) < 0) {
 		errf("Warning: Cannot open /dev/kmsg\n");
 	} else {
@@ -1390,27 +1648,39 @@ static int execute_next_entry(struct execute_state *state,
 		result = -1;
 		goto out_kmsgfd;
 	} else if (child == 0) {
+		char envstring[16];
+
 		outfd = outpipe[1];
 		errfd = errpipe[1];
+		socketfd = socket[1];
 		close(outpipe[0]);
 		close(errpipe[0]);
+		close(socket[0]);
 
 		sigprocmask(SIG_UNBLOCK, sigmask, NULL);
 
+		if (socketfd >= 0 && !getenv("IGT_RUNNER_DISABLE_SOCKET_COMMUNICATION")) {
+			snprintf(envstring, sizeof(envstring), "%d", socketfd);
+			setenv("IGT_RUNNER_SOCKET_FD", envstring, 1);
+		}
 		setenv("IGT_SENTINEL_ON_STDERR", "1", 1);
 
-		execute_test_process(outfd, errfd, settings, entry);
+		execute_test_process(outfd, errfd, socketfd, settings, entry);
 		/* unreachable */
 	}
 
 	outfd = outpipe[0];
 	errfd = errpipe[0];
+	socketfd = socket[0];
 	close(outpipe[1]);
 	close(errpipe[1]);
-	outpipe[1] = errpipe[1] = -1;
+	close(socket[1]);
+	outpipe[1] = errpipe[1] = socket[1] = -1;
 
-	result = monitor_output(child, outfd, errfd, kmsgfd, sigfd,
-				outputs, time_spent, settings, abortreason);
+	result = monitor_output(child, outfd, errfd, socketfd,
+				kmsgfd, sigfd,
+				outputs, time_spent, settings,
+				abortreason);
 
 out_kmsgfd:
 	close(kmsgfd);
@@ -1582,6 +1852,22 @@ bool initialize_execute_state_from_resume(int dirfd,
 
 	entry = &list->entries[i];
 	state->next = i;
+
+	if ((fd = openat(resdirfd, filenames[_F_SOCKET], O_RDONLY)) >= 0) {
+		if (!prune_from_comms(entry, fd)) {
+			/*
+			 * No subtests, or incomplete before the first
+			 * subtest. Not suitable to re-run.
+			 */
+			state->next = i + 1;
+		} else if (entry->binary[0] == '\0') {
+			/* Full completed */
+			state->next = i + 1;
+		}
+
+		close (fd);
+	}
+
 	if ((fd = openat(resdirfd, filenames[_F_JOURNAL], O_RDONLY)) >= 0) {
 		if (!prune_from_journal(entry, fd)) {
 			/*
diff --git a/runner/executor.h b/runner/executor.h
index 6c83e649..31f4ac16 100644
--- a/runner/executor.h
+++ b/runner/executor.h
@@ -22,6 +22,7 @@ enum {
 	_F_OUT,
 	_F_ERR,
 	_F_DMESG,
+	_F_SOCKET,
 	_F_LAST,
 };
 
diff --git a/runner/resultgen.c b/runner/resultgen.c
index 79725ca2..3d753828 100644
--- a/runner/resultgen.c
+++ b/runner/resultgen.c
@@ -1,6 +1,7 @@
 #include <assert.h>
 #include <ctype.h>
 #include <fcntl.h>
+#include <inttypes.h>
 #include <stdio.h>
 #include <string.h>
 #include <sys/mman.h>
@@ -12,6 +13,7 @@
 
 #include "igt_aux.h"
 #include "igt_core.h"
+#include "runnercomms.h"
 #include "resultgen.h"
 #include "settings.h"
 #include "executor.h"
@@ -147,7 +149,7 @@ static const char *next_line(const char *line, const char *bufend)
 		return NULL;
 }
 
-static void append_line(char **buf, size_t *buflen, char *line)
+static void append_line(char **buf, size_t *buflen, const char *line)
 {
 	size_t linelen = strlen(line);
 
@@ -1228,6 +1230,606 @@ static void fill_from_journal(int fd,
 	fclose(f);
 }
 
+typedef enum comms_state {
+	STATE_INITIAL = 0,
+	STATE_AFTER_EXEC,
+	STATE_SUBTEST_STARTED,
+	STATE_DYNAMIC_SUBTEST_STARTED,
+	STATE_BETWEEN_DYNAMIC_SUBTESTS,
+	STATE_BETWEEN_SUBTESTS,
+	STATE_EXITED,
+} comms_state_t;
+
+struct comms_context
+{
+	comms_state_t state;
+
+	struct json_object *binaryruntimeobj;
+	struct json_object *current_test;
+	struct json_object *current_dynamic_subtest;
+	char *current_subtest_name;
+	char *current_dynamic_subtest_name;
+
+	char *outbuf, *errbuf;
+	size_t outbuflen, errbuflen;
+	size_t outidx, nextoutidx;
+	size_t erridx, nexterridx;
+	size_t dynoutidx, nextdynoutidx;
+	size_t dynerridx, nextdynerridx;
+
+	char *igt_version;
+
+	char *subtestresult;
+	char *dynamicsubtestresult;
+
+	char *cmdline;
+	int exitcode;
+
+	struct subtest_list *subtests;
+	struct subtest *subtest;
+	struct results *results;
+	struct job_list_entry *entry;
+	const char *binary;
+};
+
+static void comms_free_context(struct comms_context *context)
+{
+	free(context->current_subtest_name);
+	free(context->current_dynamic_subtest_name);
+	free(context->outbuf);
+	free(context->errbuf);
+	free(context->igt_version);
+	free(context->subtestresult);
+	free(context->dynamicsubtestresult);
+	free(context->cmdline);
+}
+
+static void comms_inject_subtest_start_log(struct comms_context *context,
+					   const char *prefix,
+					   const char *subtestname)
+{
+	char msg[512];
+
+	snprintf(msg, sizeof(msg), "%s%s\n", prefix, subtestname);
+	append_line(&context->outbuf, &context->outbuflen, msg);
+	append_line(&context->errbuf, &context->errbuflen, msg);
+}
+
+static void comms_inject_subtest_end_log(struct comms_context *context,
+					 const char *prefix,
+					 const char *subtestname,
+					 const char *subtestresult,
+					 const char *timeused)
+{
+	char msg[512];
+
+	snprintf(msg, sizeof(msg), "%s%s: %s (%ss)\n", prefix, subtestname, subtestresult, timeused);
+	append_line(&context->outbuf, &context->outbuflen, msg);
+	append_line(&context->errbuf, &context->errbuflen, msg);
+}
+
+static void comms_finish_subtest(struct comms_context *context)
+{
+	json_object_object_add(context->current_test, "out",
+			       new_escaped_json_string(context->outbuf + context->outidx, context->outbuflen - context->outidx));
+	json_object_object_add(context->current_test, "err",
+			       new_escaped_json_string(context->errbuf + context->outidx, context->errbuflen - context->erridx));
+
+	if (context->igt_version)
+		add_igt_version(context->current_test, context->igt_version, strlen(context->igt_version));
+
+	if (context->subtestresult == NULL)
+		context->subtestresult = strdup("incomplete");
+	set_result(context->current_test, context->subtestresult);
+
+	free(context->subtestresult);
+	context->subtestresult = NULL;
+	context->current_test = NULL;
+
+	context->outidx = context->nextoutidx;
+	context->erridx = context->nexterridx;
+}
+
+static void comms_finish_dynamic_subtest(struct comms_context *context)
+{
+	json_object_object_add(context->current_dynamic_subtest, "out",
+			       new_escaped_json_string(context->outbuf + context->dynoutidx, context->outbuflen - context->dynoutidx));
+	json_object_object_add(context->current_dynamic_subtest, "err",
+			       new_escaped_json_string(context->errbuf + context->dynerridx, context->errbuflen - context->dynerridx));
+
+	if (context->igt_version)
+		add_igt_version(context->current_dynamic_subtest, context->igt_version, strlen(context->igt_version));
+
+	if (context->dynamicsubtestresult == NULL)
+		context->dynamicsubtestresult = strdup("incomplete");
+	set_result(context->current_dynamic_subtest, context->dynamicsubtestresult);
+
+	free(context->dynamicsubtestresult);
+	context->dynamicsubtestresult = NULL;
+	context->current_dynamic_subtest = NULL;
+
+	context->dynoutidx = context->nextdynoutidx;
+	context->dynerridx = context->nextdynerridx;
+}
+
+static void comms_add_new_subtest(struct comms_context *context,
+				  const char *subtestname)
+{
+	char piglit_name[256];
+
+	add_subtest(context->subtests, strdup(subtestname));
+	context->subtest = &context->subtests->subs[context->subtests->size - 1];
+	generate_piglit_name(context->binary, subtestname, piglit_name, sizeof(piglit_name));
+	context->current_test = get_or_create_json_object(context->results->tests, piglit_name);
+	free(context->current_subtest_name);
+	context->current_subtest_name = strdup(subtestname);
+}
+
+static void comms_add_new_dynamic_subtest(struct comms_context *context,
+					  const char *dynamic_name)
+{
+	char piglit_name[256];
+	char dynamic_piglit_name[256];
+
+	add_dynamic_subtest(context->subtest, strdup(dynamic_name));
+	generate_piglit_name(context->binary, context->current_subtest_name, piglit_name, sizeof(piglit_name));
+	generate_piglit_name_for_dynamic(piglit_name, dynamic_name, dynamic_piglit_name, sizeof(dynamic_piglit_name));
+	context->current_dynamic_subtest = get_or_create_json_object(context->results->tests, dynamic_piglit_name);
+	free(context->current_dynamic_subtest_name);
+	context->current_dynamic_subtest_name = strdup(dynamic_name);
+}
+
+static bool comms_handle_log(const struct runnerpacket *packet,
+			     runnerpacket_read_helper helper,
+			     void *userdata)
+{
+	struct comms_context *context = userdata;
+	char **textbuf;
+	size_t *textlen;
+
+	if (helper.log.stream == STDOUT_FILENO) {
+		textbuf = &context->outbuf;
+		textlen = &context->outbuflen;
+	} else {
+		textbuf = &context->errbuf;
+		textlen = &context->errbuflen;
+	}
+	append_line(textbuf, textlen, helper.log.text);
+
+	return true;
+}
+
+static bool comms_handle_exec(const struct runnerpacket *packet,
+			      runnerpacket_read_helper helper,
+			      void *userdata)
+{
+	struct comms_context *context = userdata;
+
+	switch (context->state) {
+	case STATE_INITIAL:
+		break;
+
+	case STATE_AFTER_EXEC:
+		/*
+		 * Resume after an exec that didn't involve any
+		 * subtests. Resumes can only happen for tests with
+		 * subtests, so while we might have logs already
+		 * collected, we have nowhere to put them. The joblist
+		 * doesn't help, because the ordering is up to the
+		 * test.
+		 */
+		printf("Warning: Need to discard %zd bytes of logs, no subtest data\n", context->outbuflen + context->errbuflen);
+		context->outbuflen = context->errbuflen = 0;
+		context->outidx = context->erridx = 0;
+		context->nextoutidx = context->nexterridx = 0;
+		break;
+
+	case STATE_SUBTEST_STARTED:
+	case STATE_DYNAMIC_SUBTEST_STARTED:
+	case STATE_BETWEEN_DYNAMIC_SUBTESTS:
+	case STATE_BETWEEN_SUBTESTS:
+	case STATE_EXITED:
+		/* A resume exec, so we're already collecting data. */
+		assert(context->current_test != NULL);
+		comms_finish_subtest(context);
+		break;
+	default:
+		assert(false); /* unreachable */
+	}
+
+	free(context->cmdline);
+	context->cmdline = strdup(helper.exec.cmdline);
+
+	context->state = STATE_AFTER_EXEC;
+
+	return true;
+}
+
+static bool comms_handle_exit(const struct runnerpacket *packet,
+			      runnerpacket_read_helper helper,
+			      void *userdata)
+{
+	struct comms_context *context = userdata;
+	char piglit_name[256];
+
+	if (context->state == STATE_AFTER_EXEC) {
+		/*
+		 * Exit after exec, so we didn't get any
+		 * subtests. Check if there's supposed to be any,
+		 * otherwise stuff logs into the binary's result.
+		 */
+
+		char *subtestname = NULL;
+
+		if (context->entry->subtest_count > 0) {
+			subtestname = context->entry->subtests[0];
+			add_subtest(context->subtests, strdup(subtestname));
+		}
+		generate_piglit_name(context->binary, subtestname, piglit_name, sizeof(piglit_name));
+		context->current_test = get_or_create_json_object(context->results->tests, piglit_name);
+
+		/* Get result from exitcode unless we have an override already */
+		if (context->subtestresult == NULL)
+			context->subtestresult = strdup(result_from_exitcode(helper.exit.exitcode));
+	} else if (helper.exit.exitcode == IGT_EXIT_ABORT || helper.exit.exitcode == GRACEFUL_EXITCODE) {
+		/*
+		 * If we did get subtests, we need to assign the
+		 * special exitcode results to the last subtest,
+		 * normal and dynamic
+		 */
+		const char *result = helper.exit.exitcode == IGT_EXIT_ABORT ? "abort" : "notrun";
+
+		free(context->subtestresult);
+		context->subtestresult = strdup(result);
+		free(context->dynamicsubtestresult);
+		context->dynamicsubtestresult = strdup(result);
+	}
+
+	context->exitcode = helper.exit.exitcode;
+	add_runtime(context->binaryruntimeobj, strtod(helper.exit.timeused, NULL));
+
+	context->state = STATE_EXITED;
+
+	return true;
+}
+
+static bool comms_handle_subtest_start(const struct runnerpacket *packet,
+				       runnerpacket_read_helper helper,
+				       void *userdata)
+{
+	struct comms_context *context = userdata;
+	char errmsg[512];
+
+	switch (context->state) {
+	case STATE_INITIAL:
+	case STATE_EXITED:
+		/* Subtest starts when we're not even running? (Before exec or after exit) */
+		fprintf(stderr, "Error: Unexpected subtest start (binary wasn't running)\n");
+		return false;
+	case STATE_SUBTEST_STARTED:
+	case STATE_DYNAMIC_SUBTEST_STARTED:
+	case STATE_BETWEEN_DYNAMIC_SUBTESTS:
+		/*
+		 * Subtest starts when the previous one was still
+		 * running. Text-based parsing would figure that a
+		 * resume happened, but we know the real deal with
+		 * socket comms.
+		 */
+		snprintf(errmsg, sizeof(errmsg),
+			 "\nrunner: Subtest %s already running when subtest %s starts. This is a test bug.\n",
+			 context->current_subtest_name,
+			 helper.subteststart.name);
+		append_line(&context->errbuf, &context->errbuflen, errmsg);
+
+		if (context->state == STATE_DYNAMIC_SUBTEST_STARTED ||
+		    context->state == STATE_BETWEEN_DYNAMIC_SUBTESTS)
+			comms_finish_dynamic_subtest(context);
+
+		/* fallthrough */
+	case STATE_BETWEEN_SUBTESTS:
+		/* Already collecting for a subtest, finish it up */
+		if (context->current_dynamic_subtest)
+			comms_finish_dynamic_subtest(context);
+
+		comms_finish_subtest(context);
+
+		/* fallthrough */
+	case STATE_AFTER_EXEC:
+		comms_add_new_subtest(context, helper.subteststart.name);
+
+		/* Subtest starting message is not in logs with socket comms, inject it manually */
+		comms_inject_subtest_start_log(context, STARTING_SUBTEST, helper.subteststart.name);
+
+		break;
+	default:
+		assert(false); /* unreachable */
+	}
+
+	context->state = STATE_SUBTEST_STARTED;
+
+	return true;
+}
+
+static bool comms_handle_subtest_result(const struct runnerpacket *packet,
+					runnerpacket_read_helper helper,
+					void *userdata)
+{
+	struct comms_context *context = userdata;
+	char errmsg[512];
+
+	switch (context->state) {
+	case STATE_INITIAL:
+	case STATE_EXITED:
+		/* Subtest result when we're not even running? (Before exec or after exit) */
+		fprintf(stderr, "Error: Unexpected subtest result (binary wasn't running)\n");
+		return false;
+	case STATE_DYNAMIC_SUBTEST_STARTED:
+		/*
+		 * Subtest result when dynamic subtest is still
+		 * running. Text-based parsing would consider that an
+		 * incomplete, we're able to inject a warning.
+		 */
+		snprintf(errmsg, sizeof(errmsg),
+			 "\nrunner: Dynamic subtest %s still running when subtest %s ended. This is a test bug.\n",
+			 context->current_dynamic_subtest_name,
+			 helper.subtestresult.name);
+		append_line(&context->errbuf, &context->errbuflen, errmsg);
+		comms_finish_dynamic_subtest(context);
+		break;
+	case STATE_BETWEEN_SUBTESTS:
+		/* Subtest result without starting it, and we're already collecting logs for a previous test */
+		comms_finish_subtest(context);
+		comms_add_new_subtest(context, helper.subtestresult.name);
+		break;
+	case STATE_AFTER_EXEC:
+		/* Subtest result without starting it, so comes from a fixture. We're not yet collecting logs for anything. */
+		comms_add_new_subtest(context, helper.subtestresult.name);
+		break;
+	case STATE_SUBTEST_STARTED:
+	case STATE_BETWEEN_DYNAMIC_SUBTESTS:
+		/* Normal flow */
+		break;
+	default:
+		assert(false); /* unreachable */
+	}
+
+	comms_inject_subtest_end_log(context,
+				     SUBTEST_RESULT,
+				     helper.subtestresult.name,
+				     helper.subtestresult.result,
+				     helper.subtestresult.timeused);
+
+	/* Next subtest, if any, will begin its logs right after that result line */
+	context->nextoutidx = context->outbuflen;
+	context->nexterridx = context->errbuflen;
+
+	/*
+	 * Only store the actual result from the packet if we don't
+	 * already have one. If we do, it's from an override.
+	 */
+	if (context->subtestresult == NULL) {
+		const char *mappedresult;
+
+		parse_result_string(helper.subtestresult.result,
+				    strlen(helper.subtestresult.result),
+				    &mappedresult, NULL);
+		context->subtestresult = strdup(mappedresult);
+	}
+
+	context->state = STATE_BETWEEN_SUBTESTS;
+
+	return true;
+}
+
+static bool comms_handle_dynamic_subtest_start(const struct runnerpacket *packet,
+					       runnerpacket_read_helper helper,
+					       void *userdata)
+{
+	struct comms_context *context = userdata;
+	char errmsg[512];
+
+	switch (context->state) {
+	case STATE_INITIAL:
+	case STATE_EXITED:
+		/* Dynamic subtest starts when we're not even running? (Before exec or after exit) */
+		fprintf(stderr, "Error: Unexpected dynamic subtest start (binary wasn't running)\n");
+		return false;
+	case STATE_AFTER_EXEC:
+		/* Binary was running but a subtest wasn't. We don't know where to inject an error message. */
+		fprintf(stderr, "Error: Unexpected dynamic subtest start (subtest wasn't running)\n");
+		return false;
+	case STATE_BETWEEN_SUBTESTS:
+		/*
+		 * Dynamic subtest starts when a subtest is not
+		 * running. We can't know which subtest this dynamic
+		 * subtest was supposed to be in. But we can inject a
+		 * warn into the previous subtest.
+		 */
+		snprintf(errmsg, sizeof(errmsg),
+			 "\nrunner: Dynamic subtest %s started when not inside a subtest. This is a test bug.\n",
+			 helper.dynamicsubteststart.name);
+		append_line(&context->errbuf, &context->errbuflen, errmsg);
+
+		/* Leave the state as is and hope for the best */
+		return true;
+	case STATE_DYNAMIC_SUBTEST_STARTED:
+		snprintf(errmsg, sizeof(errmsg),
+			 "\nrunner: Dynamic subtest %s already running when dynamic subtest %s starts. This is a test bug.\n",
+			 context->current_dynamic_subtest_name,
+			 helper.dynamicsubteststart.name);
+		append_line(&context->errbuf, &context->errbuflen, errmsg);
+
+		/* fallthrough */
+	case STATE_BETWEEN_DYNAMIC_SUBTESTS:
+		comms_finish_dynamic_subtest(context);
+		/* fallthrough */
+	case STATE_SUBTEST_STARTED:
+		comms_add_new_dynamic_subtest(context, helper.dynamicsubteststart.name);
+
+		/* Dynamic subtest starting message is not in logs with socket comms, inject it manually */
+		comms_inject_subtest_start_log(context, STARTING_DYNAMIC_SUBTEST, helper.dynamicsubteststart.name);
+
+		break;
+	default:
+		assert(false); /* unreachable */
+	}
+
+	context->state = STATE_DYNAMIC_SUBTEST_STARTED;
+
+	return true;
+}
+
+static bool comms_handle_dynamic_subtest_result(const struct runnerpacket *packet,
+						runnerpacket_read_helper helper,
+						void *userdata)
+{
+	struct comms_context *context = userdata;
+	char errmsg[512];
+
+	switch (context->state) {
+	case STATE_INITIAL:
+	case STATE_EXITED:
+		/* Dynamic subtest result when we're not even running? (Before exec or after exit) */
+		fprintf(stderr, "Error: Unexpected dynamic subtest result (binary wasn't running)\n");
+		return false;
+	case STATE_AFTER_EXEC:
+		/* Binary was running but a subtest wasn't. We don't know where to inject an error message. */
+		fprintf(stderr, "Error: Unexpected dynamic subtest result (subtest wasn't running)\n");
+		return false;
+	case STATE_BETWEEN_SUBTESTS:
+		/*
+		 * Dynamic subtest result when a subtest is not
+		 * running. We can't know which subtest this dynamic
+		 * subtest was supposed to be in. But we can inject a
+		 * warn into the previous subtest.
+		 */
+		snprintf(errmsg, sizeof(errmsg),
+			 "\nrunner: Dynamic subtest %s result when not inside a subtest. This is a test bug.\n",
+			 helper.dynamicsubtestresult.name);
+		append_line(&context->errbuf, &context->errbuflen, errmsg);
+
+		/* Leave the state as is and hope for the best */
+		return true;
+	case STATE_BETWEEN_DYNAMIC_SUBTESTS:
+		/*
+		 * Result without starting. There's no
+		 * skip_subtests_henceforth equivalent for dynamic
+		 * subtests so this shouldn't happen, but we can
+		 * handle it nevertheless.
+		 */
+		comms_finish_dynamic_subtest(context);
+		/* fallthrough */
+	case STATE_SUBTEST_STARTED:
+		/* Result without starting, but we aren't collecting for a dynamic subtest yet */
+		comms_add_new_dynamic_subtest(context, helper.dynamicsubtestresult.name);
+		break;
+	case STATE_DYNAMIC_SUBTEST_STARTED:
+		/* Normal flow */
+		break;
+	default:
+		assert(false); /* unreachable */
+	}
+
+	comms_inject_subtest_end_log(context,
+				     DYNAMIC_SUBTEST_RESULT,
+				     helper.dynamicsubtestresult.name,
+				     helper.dynamicsubtestresult.result,
+				     helper.dynamicsubtestresult.timeused);
+
+	/* Next dynamic subtest, if any, will begin its logs right after that result line */
+	context->nextdynoutidx = context->outbuflen;
+	context->nextdynerridx = context->errbuflen;
+
+	/*
+	 * Only store the actual result from the packet if we don't
+	 * already have one. If we do, it's from an override.
+	 */
+	if (context->dynamicsubtestresult == NULL) {
+		const char *mappedresult;
+
+		parse_result_string(helper.dynamicsubtestresult.result,
+				    strlen(helper.dynamicsubtestresult.result),
+				    &mappedresult, NULL);
+		context->dynamicsubtestresult = strdup(mappedresult);
+	}
+
+	context->state = STATE_BETWEEN_DYNAMIC_SUBTESTS;
+
+	return true;
+}
+
+static bool comms_handle_versionstring(const struct runnerpacket *packet,
+				       runnerpacket_read_helper helper,
+				       void *userdata)
+{
+	struct comms_context *context = userdata;
+
+	free(context->igt_version);
+	context->igt_version = strdup(helper.versionstring.text);
+
+	return true;
+}
+
+static bool comms_handle_result_override(const struct runnerpacket *packet,
+					 runnerpacket_read_helper helper,
+					 void *userdata)
+{
+	struct comms_context *context = userdata;
+
+	if (context->current_dynamic_subtest) {
+		free(context->dynamicsubtestresult);
+		context->dynamicsubtestresult = strdup(helper.resultoverride.result);
+	}
+
+	free(context->subtestresult);
+	context->subtestresult = strdup(helper.resultoverride.result);
+
+	return true;
+}
+
+static int fill_from_comms(int fd,
+			   struct job_list_entry *entry,
+			   struct subtest_list *subtests,
+			   struct results *results)
+{
+	struct comms_context context = {};
+	struct comms_visitor visitor = {
+		.log = comms_handle_log,
+		.exec = comms_handle_exec,
+		.exit = comms_handle_exit,
+		.subtest_start = comms_handle_subtest_start,
+		.subtest_result = comms_handle_subtest_result,
+		.dynamic_subtest_start = comms_handle_dynamic_subtest_start,
+		.dynamic_subtest_result = comms_handle_dynamic_subtest_result,
+		.versionstring = comms_handle_versionstring,
+		.result_override = comms_handle_result_override,
+
+		.userdata = &context,
+	};
+	char piglit_name[256];
+	int ret = COMMSPARSE_EMPTY;
+
+	if (fd < 0)
+		return COMMSPARSE_EMPTY;
+
+	context.entry = entry;
+	context.binary = entry->binary;
+	generate_piglit_name(entry->binary, NULL, piglit_name, sizeof(piglit_name));
+	context.binaryruntimeobj = get_or_create_json_object(results->runtimes, piglit_name);
+	context.results = results;
+	context.subtests = subtests;
+
+	ret = comms_read_dump(fd, &visitor);
+
+	if (context.current_dynamic_subtest != NULL)
+		comms_finish_dynamic_subtest(&context);
+	if (context.current_test != NULL)
+		comms_finish_subtest(&context);
+	comms_free_context(&context);
+
+	return ret;
+}
+
 static bool result_is_requested(struct job_list_entry *entry,
 				const char *subtestname,
 				const char *dynamic_name)
@@ -1505,6 +2107,7 @@ static bool parse_test_directory(int dirfd,
 	int fds[_F_LAST];
 	struct subtest_list subtests = {};
 	bool status = true;
+	int commsparsed;
 
 	if (!open_output_files(dirfd, fds, false)) {
 		fprintf(stderr, "Error opening output files\n");
@@ -1517,10 +2120,28 @@ static bool parse_test_directory(int dirfd,
 	 */
 	fill_from_journal(fds[_F_JOURNAL], entry, &subtests, results);
 
-	if (!fill_from_output(fds[_F_OUT], entry->binary, "out", &subtests, results->tests) ||
-	    !fill_from_output(fds[_F_ERR], entry->binary, "err", &subtests, results->tests) ||
-	    !fill_from_dmesg(fds[_F_DMESG], settings, entry->binary, &subtests, results->tests)) {
-		fprintf(stderr, "Error parsing output files\n");
+	/*
+	 * Get test output from socket comms if it exists, otherwise
+	 * parse stdout/stderr
+	 */
+	commsparsed = fill_from_comms(fds[_F_SOCKET], entry, &subtests, results);
+	if (commsparsed == COMMSPARSE_ERROR) {
+		fprintf(stderr, "Error parsing output files (comms)\n");
+		status = false;
+		goto parse_output_end;
+	}
+
+	if (commsparsed == COMMSPARSE_EMPTY) {
+		if (!fill_from_output(fds[_F_OUT], entry->binary, "out", &subtests, results->tests) ||
+		    !fill_from_output(fds[_F_ERR], entry->binary, "err", &subtests, results->tests)) {
+			fprintf(stderr, "Error parsing output files (out.txt, err.txt)\n");
+			status = false;
+			goto parse_output_end;
+		}
+	}
+
+	if (!fill_from_dmesg(fds[_F_DMESG], settings, entry->binary, &subtests, results->tests)) {
+		fprintf(stderr, "Error parsing output files (dmesg.txt)\n");
 		status = false;
 		goto parse_output_end;
 	}
-- 
2.30.2



More information about the igt-dev mailing list