[Midnightbsd-cvs] src [9055] trunk/sbin/hastd: add support for memsync mode to hastd.

laffer1 at midnightbsd.org laffer1 at midnightbsd.org
Sat Oct 1 06:04:34 EDT 2016


Revision: 9055
          http://svnweb.midnightbsd.org/src/?rev=9055
Author:   laffer1
Date:     2016-10-01 06:04:34 -0400 (Sat, 01 Oct 2016)
Log Message:
-----------
add support for memsync mode to hastd.

Modified Paths:
--------------
    trunk/sbin/hastd/hast.conf.5
    trunk/sbin/hastd/hast.h
    trunk/sbin/hastd/hast_proto.c
    trunk/sbin/hastd/hastd.c
    trunk/sbin/hastd/parse.y
    trunk/sbin/hastd/primary.c
    trunk/sbin/hastd/secondary.c

Modified: trunk/sbin/hastd/hast.conf.5
===================================================================
--- trunk/sbin/hastd/hast.conf.5	2016-10-01 10:03:54 UTC (rev 9054)
+++ trunk/sbin/hastd/hast.conf.5	2016-10-01 10:04:34 UTC (rev 9055)
@@ -129,9 +129,13 @@
 .Aq node
 argument can be replaced either by a full hostname as obtained by
 .Xr gethostname 3 ,
-only first part of the hostname, or by node's UUID as found in the
+only first part of the hostname, by node's UUID as found in the
 .Va kern.hostuuid
 .Xr sysctl 8
+variable
+or by node's hostid as found in the
+.Va kern.hostid
+.Xr sysctl 8
 variable.
 .Pp
 The following statements are available:
@@ -208,15 +212,12 @@
 The risk of such a situation is very small.
 The
 .Ic memsync
-replication mode is currently not implemented.
+replication mode is the default.
 .It Ic fullsync
 .Pp
 Mark the write operation as completed when local as well as remote
 write completes.
 This is the safest and the slowest replication mode.
-The
-.Ic fullsync
-replication mode is the default.
 .It Ic async
 .Pp
 The write operation is reported as complete right after the local write

Modified: trunk/sbin/hastd/hast.h
===================================================================
--- trunk/sbin/hastd/hast.h	2016-10-01 10:03:54 UTC (rev 9054)
+++ trunk/sbin/hastd/hast.h	2016-10-01 10:04:34 UTC (rev 9055)
@@ -53,8 +53,9 @@
  * Version history:
  * 0 - initial version
  * 1 - HIO_KEEPALIVE added
+ * 2 - "memsync" and "received" attributes added for memsync mode
  */
-#define	HAST_PROTO_VERSION	1
+#define	HAST_PROTO_VERSION	2
 
 #define	EHAST_OK		0
 #define	EHAST_NOENTRY		1
@@ -142,8 +143,10 @@
 struct hast_resource {
 	/* Resource name. */
 	char	hr_name[NAME_MAX];
-	/* Replication mode (HAST_REPLICATION_*). */
+	/* Negotiated replication mode (HAST_REPLICATION_*). */
 	int	hr_replication;
+	/* Configured replication mode (HAST_REPLICATION_*). */
+	int	hr_original_replication;
 	/* Provider name that will appear in /dev/hast/. */
 	char	hr_provname[NAME_MAX];
 	/* Synchronization extent size. */
@@ -156,6 +159,8 @@
 	int	hr_compression;
 	/* Checksum algorithm. */
 	int	hr_checksum;
+	/* Protocol version. */
+	int	hr_version;
 
 	/* Path to local component. */
 	char	hr_localpath[PATH_MAX];

Modified: trunk/sbin/hastd/hast_proto.c
===================================================================
--- trunk/sbin/hastd/hast_proto.c	2016-10-01 10:03:54 UTC (rev 9054)
+++ trunk/sbin/hastd/hast_proto.c	2016-10-01 10:04:34 UTC (rev 9055)
@@ -112,7 +112,7 @@
 	if (eb == NULL)
 		goto end;
 
-	hdr.version = HAST_PROTO_VERSION;
+	hdr.version = res != NULL ? res->hr_version : HAST_PROTO_VERSION;
 	hdr.size = htole32((uint32_t)ebuf_size(eb));
 	if (ebuf_add_head(eb, &hdr, sizeof(hdr)) == -1)
 		goto end;
@@ -144,7 +144,7 @@
 	if (proto_recv(conn, &hdr, sizeof(hdr)) == -1)
 		goto fail;
 
-	if (hdr.version != HAST_PROTO_VERSION) {
+	if (hdr.version > HAST_PROTO_VERSION) {
 		errno = ERPCMISMATCH;
 		goto fail;
 	}

Modified: trunk/sbin/hastd/hastd.c
===================================================================
--- trunk/sbin/hastd/hastd.c	2016-10-01 10:03:54 UTC (rev 9054)
+++ trunk/sbin/hastd/hastd.c	2016-10-01 10:04:34 UTC (rev 9055)
@@ -68,7 +68,7 @@
 bool sigexit_received = false;
 /* Path to pidfile. */
 static const char *pidfile;
-/* PID file handle. */
+/* Pidfile handle. */
 struct pidfh *pfh;
 /* Do we run in foreground? */
 static bool foreground;
@@ -748,6 +748,7 @@
 	const char *resname;
 	const unsigned char *token;
 	char laddr[256], raddr[256];
+	uint8_t version;
 	size_t size;
 	pid_t pid;
 	int status;
@@ -797,6 +798,20 @@
 		goto close;
 	}
 	pjdlog_debug(2, "%s: resource=%s", raddr, resname);
+	version = nv_get_uint8(nvin, "version");
+	pjdlog_debug(2, "%s: version=%hhu", raddr, version);
+	if (version == 0) {
+		/*
+		 * If no version is sent, it means this is protocol version 1.
+		 */
+		version = 1;
+	}
+	if (version > HAST_PROTO_VERSION) {
+		pjdlog_info("Remote protocol version %hhu is not supported, falling back to version %hhu.",
+		    version, (unsigned char)HAST_PROTO_VERSION);
+		version = HAST_PROTO_VERSION;
+	}
+	pjdlog_debug(1, "Negotiated protocol version %hhu.", version);
 	token = nv_get_uint8_array(nvin, &size, "token");
 	/*
 	 * NULL token means that this is first connection.
@@ -910,8 +925,10 @@
 	 */
 
 	if (token == NULL) {
+		res->hr_version = version;
 		arc4random_buf(res->hr_token, sizeof(res->hr_token));
 		nvout = nv_alloc();
+		nv_add_uint8(nvout, version, "version");
 		nv_add_uint8_array(nvout, res->hr_token,
 		    sizeof(res->hr_token), "token");
 		if (nv_error(nvout) != 0) {
@@ -922,7 +939,7 @@
 			    strerror(nv_error(nvout)));
 			goto fail;
 		}
-		if (hast_proto_send(NULL, conn, nvout, NULL, 0) == -1) {
+		if (hast_proto_send(res, conn, nvout, NULL, 0) == -1) {
 			int error = errno;
 
 			pjdlog_errno(LOG_ERR, "Unable to send response to %s",

Modified: trunk/sbin/hastd/parse.y
===================================================================
--- trunk/sbin/hastd/parse.y	2016-10-01 10:03:54 UTC (rev 9054)
+++ trunk/sbin/hastd/parse.y	2016-10-01 10:04:34 UTC (rev 9055)
@@ -81,6 +81,7 @@
 isitme(const char *name)
 {
 	char buf[MAXHOSTNAMELEN];
+	unsigned long hostid;
 	char *pos;
 	size_t bufsize;
 
@@ -95,7 +96,7 @@
 		return (1);
 
 	/*
-	 * Now check if it matches first part of the host name.
+	 * Check if it matches first part of the host name.
 	 */
 	pos = strchr(buf, '.');
 	if (pos != NULL && (size_t)(pos - buf) == strlen(name) &&
@@ -104,7 +105,7 @@
 	}
 
 	/*
-	 * At the end check if name is equal to our host's UUID.
+	 * Check if it matches host UUID.
 	 */
 	bufsize = sizeof(buf);
 	if (sysctlbyname("kern.hostuuid", buf, &bufsize, NULL, 0) < 0) {
@@ -114,6 +115,18 @@
 	if (strcasecmp(buf, name) == 0)
 		return (1);
 
+ 	/*
+	 * Check if it matches hostid.
+	 */
+	bufsize = sizeof(hostid);
+	if (sysctlbyname("kern.hostid", &hostid, &bufsize, NULL, 0) < 0) {
+		pjdlog_errno(LOG_ERR, "sysctlbyname(kern.hostid) failed");
+		return (-1);
+	}
+	(void)snprintf(buf, sizeof(buf), "hostid%lu", hostid);
+	if (strcmp(buf, name) == 0)
+		return (1);
+
 	/*
 	 * Looks like this isn't about us.
 	 */
@@ -138,6 +151,7 @@
 {
 	static char names[MAXHOSTNAMELEN * 3];
 	char buf[MAXHOSTNAMELEN];
+	unsigned long hostid;
 	char *pos;
 	size_t bufsize;
 
@@ -165,7 +179,17 @@
 		return (-1);
 	}
 	(void)strlcat(names, buf, sizeof(names));
+	(void)strlcat(names, ", ", sizeof(names));
 
+	/* Host ID. */
+	bufsize = sizeof(hostid);
+	if (sysctlbyname("kern.hostid", &hostid, &bufsize, NULL, 0) < 0) {
+		pjdlog_errno(LOG_ERR, "sysctlbyname(kern.hostid) failed");
+		return (-1);
+	}
+	(void)snprintf(buf, sizeof(buf), "hostid%lu", hostid);
+	(void)strlcat(names, buf, sizeof(names));
+
 	*namesp = names;
 
 	return (0);
@@ -190,7 +214,7 @@
 	lineno = 0;
 
 	depth0_timeout = HAST_TIMEOUT;
-	depth0_replication = HAST_REPLICATION_FULLSYNC;
+	depth0_replication = HAST_REPLICATION_MEMSYNC;
 	depth0_checksum = HAST_CHECKSUM_NONE;
 	depth0_compression = HAST_COMPRESSION_HOLE;
 	strlcpy(depth0_control, HAST_CONTROL, sizeof(depth0_control));
@@ -300,12 +324,8 @@
 			 * Use global or default setting.
 			 */
 			curres->hr_replication = depth0_replication;
+			curres->hr_original_replication = depth0_replication;
 		}
-		if (curres->hr_replication == HAST_REPLICATION_MEMSYNC) {
-			pjdlog_warning("Replication mode \"%s\" is not implemented, falling back to \"%s\".",
-			    "memsync", "fullsync");
-			curres->hr_replication = HAST_REPLICATION_FULLSYNC;
-		}
 		if (curres->hr_checksum == -1) {
 			/*
 			 * Checksum is not set at resource-level.
@@ -523,6 +543,7 @@
 		case 1:
 			PJDLOG_ASSERT(curres != NULL);
 			curres->hr_replication = $2;
+			curres->hr_original_replication = $2;
 			break;
 		default:
 			PJDLOG_ABORT("replication at wrong depth level");
@@ -820,8 +841,10 @@
 		curres->hr_role = HAST_ROLE_INIT;
 		curres->hr_previous_role = HAST_ROLE_INIT;
 		curres->hr_replication = -1;
+		curres->hr_original_replication = -1;
 		curres->hr_checksum = -1;
 		curres->hr_compression = -1;
+		curres->hr_version = 1;
 		curres->hr_timeout = -1;
 		curres->hr_exec[0] = '\0';
 		curres->hr_provname[0] = '\0';

Modified: trunk/sbin/hastd/primary.c
===================================================================
--- trunk/sbin/hastd/primary.c	2016-10-01 10:03:54 UTC (rev 9054)
+++ trunk/sbin/hastd/primary.c	2016-10-01 10:04:34 UTC (rev 9055)
@@ -35,7 +35,6 @@
 #include <sys/time.h>
 #include <sys/bio.h>
 #include <sys/disk.h>
-#include <sys/refcount.h>
 #include <sys/stat.h>
 
 #include <geom/gate/g_gate.h>
@@ -65,6 +64,7 @@
 #include "metadata.h"
 #include "proto.h"
 #include "pjdlog.h"
+#include "refcnt.h"
 #include "subr.h"
 #include "synch.h"
 
@@ -545,7 +545,7 @@
 
 	return (0);
 }
- 
+
 /*
  * Function instructs GEOM_GATE to handle reads directly from within the kernel.
  */
@@ -579,6 +579,7 @@
 	int32_t extentsize;
 	int64_t datasize;
 	uint32_t mapsize;
+	uint8_t version;
 	size_t size;
 	int error;
 
@@ -599,6 +600,7 @@
 	 */
 	nvout = nv_alloc();
 	nv_add_string(nvout, res->hr_name, "resource");
+	nv_add_uint8(nvout, HAST_PROTO_VERSION, "version");
 	if (nv_error(nvout) != 0) {
 		pjdlog_common(LOG_WARNING, 0, nv_error(nvout),
 		    "Unable to allocate header for connection with %s",
@@ -628,6 +630,20 @@
 		nv_free(nvin);
 		goto close;
 	}
+	version = nv_get_uint8(nvin, "version");
+	if (version == 0) {
+		/*
+		 * If no version is sent, it means this is protocol version 1.
+		 */
+		version = 1;
+	}
+	if (version > HAST_PROTO_VERSION) {
+		pjdlog_warning("Invalid version received (%hhu).", version);
+		nv_free(nvin);
+		goto close;
+	}
+	res->hr_version = version;
+	pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version);
 	token = nv_get_uint8_array(nvin, &size, "token");
 	if (token == NULL) {
 		pjdlog_warning("Handshake header from %s has no 'token' field.",
@@ -778,6 +794,16 @@
 		pjdlog_errno(LOG_WARNING, "Unable to set connection direction");
 #endif
 	pjdlog_info("Connected to %s.", res->hr_remoteaddr);
+	if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC &&
+	    res->hr_version < 2) {
+		pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode.");
+		res->hr_replication = HAST_REPLICATION_FULLSYNC;
+	} else if (res->hr_replication != res->hr_original_replication) {
+		/*
+		 * This is in case hastd disconnected and was upgraded.
+		 */
+		res->hr_replication = res->hr_original_replication;
+	}
 	if (inp != NULL && outp != NULL) {
 		*inp = in;
 		*outp = out;
@@ -1011,7 +1037,8 @@
 }
 
 static void
-reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...)
+reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio,
+    const char *fmt, ...)
 {
 	char msg[1024];
 	va_list ap;
@@ -1022,13 +1049,11 @@
 	switch (ggio->gctl_cmd) {
 	case BIO_READ:
 		(void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).",
-		    (uintmax_t)ggio->gctl_offset,
-		    (uintmax_t)ggio->gctl_length);
+		    (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
 		break;
 	case BIO_DELETE:
 		(void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).",
-		    (uintmax_t)ggio->gctl_offset,
-		    (uintmax_t)ggio->gctl_length);
+		    (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
 		break;
 	case BIO_FLUSH:
 		(void)snprlcat(msg, sizeof(msg), "FLUSH.");
@@ -1035,8 +1060,7 @@
 		break;
 	case BIO_WRITE:
 		(void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).",
-		    (uintmax_t)ggio->gctl_offset,
-		    (uintmax_t)ggio->gctl_length);
+		    (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length);
 		break;
 	default:
 		(void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).",
@@ -1276,8 +1300,13 @@
 		}
 		pjdlog_debug(2,
 		    "ggate_recv: (%p) Moving request to the send queues.", hio);
-		refcount_init(&hio->hio_countdown, ncomps);
-		for (ii = ncomp; ii < ncomp + ncomps; ii++)
+		hio->hio_countdown = ncomps;
+		if (hio->hio_replication == HAST_REPLICATION_MEMSYNC &&
+		    ggio->gctl_cmd == BIO_WRITE) {
+			/* Each remote request needs two responses in memsync. */
+			hio->hio_countdown++;
+		}
+		for (ii = ncomp; ii < ncomps; ii++)
 			QUEUE_INSERT1(hio, send, ii);
 	}
 	/* NOTREACHED */
@@ -1348,8 +1377,7 @@
 			} else {
 				hio->hio_errors[ncomp] = 0;
 				if (hio->hio_replication ==
-				    HAST_REPLICATION_ASYNC &&
-				    !ISSYNCREQ(hio)) {
+				    HAST_REPLICATION_ASYNC) {
 					ggio->gctl_error = 0;
 					write_complete(res, hio);
 				}
@@ -1387,8 +1415,42 @@
 			}
 			break;
 		}
-		if (!refcount_release(&hio->hio_countdown))
-			continue;
+
+		if (hio->hio_replication != HAST_REPLICATION_MEMSYNC ||
+		    ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) {
+			if (refcnt_release(&hio->hio_countdown) > 0)
+				continue;
+		} else {
+			/*
+			 * Depending on hio_countdown value, requests finished
+			 * in the following order:
+			 * 0: remote memsync, remote final, local write
+			 * 1: remote memsync, local write, (remote final)
+			 * 2: local write, (remote memsync), (remote final)
+			 */
+			switch (refcnt_release(&hio->hio_countdown)) {
+			case 0:
+				/*
+				 * Local write finished as last.
+				 */
+				break;
+			case 1:
+				/*
+				 * Local write finished after remote memsync
+				 * reply arrvied. We can complete the write now.
+				 */
+				if (hio->hio_errors[0] == 0)
+					write_complete(res, hio);
+				continue;
+			case 2:
+				/*
+				 * Local write finished as first.
+				 */
+				continue;
+			default:
+				PJDLOG_ABORT("Invalid hio_countdown.");
+			}
+		}
 		if (ISSYNCREQ(hio)) {
 			mtx_lock(&sync_lock);
 			SYNCREQDONE(hio);
@@ -1510,6 +1572,10 @@
 		nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq");
 		nv_add_uint64(nv, offset, "offset");
 		nv_add_uint64(nv, length, "length");
+		if (hio->hio_replication == HAST_REPLICATION_MEMSYNC &&
+		    ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) {
+			nv_add_uint8(nv, 1, "memsync");
+		}
 		if (nv_error(nv) != 0) {
 			hio->hio_errors[ncomp] = nv_error(nv);
 			pjdlog_debug(2,
@@ -1570,7 +1636,7 @@
 done_queue:
 		nv_free(nv);
 		if (ISSYNCREQ(hio)) {
-			if (!refcount_release(&hio->hio_countdown))
+			if (refcnt_release(&hio->hio_countdown) > 0)
 				continue;
 			mtx_lock(&sync_lock);
 			SYNCREQDONE(hio);
@@ -1585,8 +1651,10 @@
 				(void)hast_activemap_flush(res);
 			}
 			mtx_unlock(&res->hr_amp_lock);
+			if (hio->hio_replication == HAST_REPLICATION_MEMSYNC)
+				(void)refcnt_release(&hio->hio_countdown);
 		}
-		if (!refcount_release(&hio->hio_countdown))
+		if (refcnt_release(&hio->hio_countdown) > 0)
 			continue;
 		pjdlog_debug(2,
 		    "remote_send: (%p) Moving request to the done queue.",
@@ -1610,6 +1678,7 @@
 	struct nv *nv;
 	unsigned int ncomp;
 	uint64_t seq;
+	bool memsyncack;
 	int error;
 
 	/* Remote component is 1 for now. */
@@ -1625,6 +1694,8 @@
 		}
 		mtx_unlock(&hio_recv_list_lock[ncomp]);
 
+		memsyncack = false;
+
 		rw_rlock(&hio_remote_lock[ncomp]);
 		if (!ISCONNECTED(res, ncomp)) {
 			rw_unlock(&hio_remote_lock[ncomp]);
@@ -1654,6 +1725,7 @@
 			nv_free(nv);
 			continue;
 		}
+		memsyncack = nv_exists(nv, "received");
 		mtx_lock(&hio_recv_list_lock[ncomp]);
 		TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) {
 			if (hio->hio_ggio.gctl_seq == seq) {
@@ -1709,8 +1781,80 @@
 		hio->hio_errors[ncomp] = 0;
 		nv_free(nv);
 done_queue:
-		if (!refcount_release(&hio->hio_countdown))
-			continue;
+		if (hio->hio_replication != HAST_REPLICATION_MEMSYNC ||
+		    hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) {
+			if (refcnt_release(&hio->hio_countdown) > 0)
+				continue;
+		} else {
+			/*
+			 * Depending on hio_countdown value, requests finished
+			 * in the following order:
+			 *
+			 * 0: local write, remote memsync, remote final
+			 * or
+			 * 0: remote memsync, local write, remote final
+			 *
+			 * 1: local write, remote memsync, (remote final)
+			 * or
+			 * 1: remote memsync, remote final, (local write)
+			 *
+			 * 2: remote memsync, (local write), (remote final)
+			 * or
+			 * 2: remote memsync, (remote final), (local write)
+			 */
+			switch (refcnt_release(&hio->hio_countdown)) {
+			case 0:
+				/*
+				 * Remote final reply arrived.
+				 */
+				PJDLOG_ASSERT(!memsyncack);
+				break;
+			case 1:
+				if (memsyncack) {
+					/*
+					 * Local request already finished, so we
+					 * can complete the write.
+					 */
+					if (hio->hio_errors[0] == 0)
+						write_complete(res, hio);
+					/*
+					 * We still need to wait for final
+					 * remote reply.
+					 */
+					pjdlog_debug(2,
+					    "remote_recv: (%p) Moving request back to the recv queue.",
+					    hio);
+					mtx_lock(&hio_recv_list_lock[ncomp]);
+					TAILQ_INSERT_TAIL(&hio_recv_list[ncomp],
+					    hio, hio_next[ncomp]);
+					mtx_unlock(&hio_recv_list_lock[ncomp]);
+				} else {
+					/*
+					 * Remote final reply arrived before
+					 * local write finished.
+					 * Nothing to do in such case.
+					 */
+				}
+				continue;
+			case 2:
+				/*
+				 * We received remote memsync reply even before
+				 * local write finished.
+				 */
+				PJDLOG_ASSERT(memsyncack);
+
+				pjdlog_debug(2,
+				    "remote_recv: (%p) Moving request back to the recv queue.",
+				    hio);
+				mtx_lock(&hio_recv_list_lock[ncomp]);
+				TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio,
+				    hio_next[ncomp]);
+				mtx_unlock(&hio_recv_list_lock[ncomp]);
+				continue;
+			default:
+				PJDLOG_ABORT("Invalid hio_countdown.");
+			}
+		}
 		if (ISSYNCREQ(hio)) {
 			mtx_lock(&sync_lock);
 			SYNCREQDONE(hio);
@@ -1995,7 +2139,7 @@
 			ncomp = 1;
 		}
 		mtx_unlock(&metadata_lock);
-		refcount_init(&hio->hio_countdown, 1);
+		hio->hio_countdown = 1;
 		QUEUE_INSERT1(hio, send, ncomp);
 
 		/*
@@ -2045,7 +2189,7 @@
 
 		pjdlog_debug(2, "sync: (%p) Moving request to the send queue.",
 		    hio);
-		refcount_init(&hio->hio_countdown, 1);
+		hio->hio_countdown = 1;
 		QUEUE_INSERT1(hio, send, ncomp);
 
 		/*

Modified: trunk/sbin/hastd/secondary.c
===================================================================
--- trunk/sbin/hastd/secondary.c	2016-10-01 10:03:54 UTC (rev 9054)
+++ trunk/sbin/hastd/secondary.c	2016-10-01 10:04:34 UTC (rev 9055)
@@ -71,6 +71,7 @@
 	uint8_t		 hio_cmd;
 	uint64_t	 hio_offset;
 	uint64_t	 hio_length;
+	bool		 hio_memsync;
 	TAILQ_ENTRY(hio) hio_next;
 };
 
@@ -135,9 +136,25 @@
 	hio->hio_cmd = HIO_UNDEF;
 	hio->hio_offset = 0;
 	hio->hio_length = 0;
+	hio->hio_memsync = false;
 }
 
 static void
+hio_copy(const struct hio *srchio, struct hio *dsthio)
+{
+
+	/*
+	 * We don't copy hio_error, hio_data and hio_next fields.
+	 */
+
+	dsthio->hio_seq = srchio->hio_seq;
+	dsthio->hio_cmd = srchio->hio_cmd;
+	dsthio->hio_offset = srchio->hio_offset;
+	dsthio->hio_length = srchio->hio_length;
+	dsthio->hio_memsync = srchio->hio_memsync;
+}
+
+static void
 init_environment(void)
 {
 	struct hio *hio;
@@ -543,8 +560,10 @@
 	case HIO_FLUSH:
 	case HIO_KEEPALIVE:
 		break;
+	case HIO_WRITE:
+		hio->hio_memsync = nv_exists(nv, "memsync");
+		/* FALLTHROUGH */
 	case HIO_READ:
-	case HIO_WRITE:
 	case HIO_DELETE:
 		hio->hio_offset = nv_get_uint64(nv, "offset");
 		if (nv_error(nv) != 0) {
@@ -621,7 +640,7 @@
 recv_thread(void *arg)
 {
 	struct hast_resource *res = arg;
-	struct hio *hio;
+	struct hio *hio, *mshio;
 	struct nv *nv;
 
 	for (;;) {
@@ -675,6 +694,27 @@
 				secondary_exit(EX_TEMPFAIL,
 				    "Unable to receive request data");
 			}
+			if (hio->hio_memsync) {
+				/*
+				 * For memsync requests we expect two replies.
+				 * Clone the hio so we can handle both of them.
+				 */
+				pjdlog_debug(2, "recv: Taking free request.");
+				QUEUE_TAKE(free, mshio);
+				pjdlog_debug(2, "recv: (%p) Got request.",
+				    mshio);
+				hio_copy(hio, mshio);
+				mshio->hio_error = 0;
+				/*
+				 * We want to keep 'memsync' tag only on the
+				 * request going onto send queue (mshio).
+				 */
+				hio->hio_memsync = false;
+				pjdlog_debug(2,
+				    "recv: (%p) Moving memsync request to the send queue.",
+				    mshio);
+				QUEUE_INSERT(send, mshio);
+			}
 		}
 		nv_free(nv);
 		pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.",
@@ -819,6 +859,10 @@
 		nvout = nv_alloc();
 		/* Copy sequence number. */
 		nv_add_uint64(nvout, hio->hio_seq, "seq");
+		if (hio->hio_memsync) {
+			PJDLOG_ASSERT(hio->hio_cmd == HIO_WRITE);
+			nv_add_int8(nvout, 1, "received");
+		}
 		switch (hio->hio_cmd) {
 		case HIO_READ:
 			if (hio->hio_error == 0) {



More information about the Midnightbsd-cvs mailing list