[Midnightbsd-cvs] src: bin/cpdup: Bring in changes from DragonFly to MidnightBSD version

laffer1 at midnightbsd.org laffer1 at midnightbsd.org
Sat Aug 16 23:04:22 EDT 2008


Log Message:
-----------
Bring in changes from DragonFly to MidnightBSD version of cpdup.

Update to 1.11 (previous version 1.09).

This includes some improvements to the multithreaded code as well as fixes for the protocol.  As older versions are not compatible, it's important to run the same versions.  This should work with DragonFly 2.0's cpdup.

Modified Files:
--------------
    src/bin/cpdup:
        PORTING (r1.2 -> r1.3)
        cpdup.1 (r1.2 -> r1.3)
        cpdup.c (r1.2 -> r1.3)
        cpdup.h (r1.2 -> r1.3)
        hclink.c (r1.2 -> r1.3)
        hclink.h (r1.2 -> r1.3)
        hcproto.c (r1.2 -> r1.3)
        hcproto.h (r1.2 -> r1.3)
        misc.c (r1.2 -> r1.3)

-------------- next part --------------
Index: hcproto.c
===================================================================
RCS file: /home/cvs/src/bin/cpdup/hcproto.c,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/hcproto.c -L bin/cpdup/hcproto.c -u -r1.2 -r1.3
--- bin/cpdup/hcproto.c
+++ bin/cpdup/hcproto.c
@@ -3,7 +3,7 @@
  *
  * This module implements a simple remote control protocol
  *
- * $DragonFly: src/bin/cpdup/hcproto.c,v 1.2 2008/04/10 22:09:08 dillon Exp $
+ * $DragonFly: src/bin/cpdup/hcproto.c,v 1.6 2008/04/16 17:38:19 dillon Exp $
  */
 
 #include "cpdup.h"
@@ -29,6 +29,7 @@
 static int rc_chown(hctransaction_t trans, struct HCHead *);
 static int rc_lchown(hctransaction_t trans, struct HCHead *);
 static int rc_chmod(hctransaction_t trans, struct HCHead *);
+static int rc_mknod(hctransaction_t trans, struct HCHead *);
 static int rc_link(hctransaction_t trans, struct HCHead *);
 #ifdef _ST_FLAGS_PRESENT_
 static int rc_chflags(hctransaction_t trans, struct HCHead *);
@@ -56,6 +57,7 @@
     { HC_CHOWN,		rc_chown },
     { HC_LCHOWN,	rc_lchown },
     { HC_CHMOD,		rc_chmod },
+    { HC_MKNOD,		rc_mknod },
     { HC_LINK,		rc_link },
 #ifdef _ST_FLAGS_PRESENT_
     { HC_CHFLAGS,	rc_chflags },
@@ -70,8 +72,10 @@
 int
 hc_connect(struct HostConf *hc)
 {
-    if (hcc_connect(hc) < 0)
+    if (hcc_connect(hc) < 0) {
+	fprintf(stderr, "Unable to connect to %s\n", hc->host);
 	return(-1);
+    }
     return(hc_hello(hc));
 }
 
@@ -103,9 +107,12 @@
 
     trans = hcc_start_command(hc, HC_HELLO);
     hcc_leaf_string(trans, LC_HELLOSTR, hostbuf);
-    hcc_leaf_int32(trans, LC_VERSION, 1);
-    if ((head = hcc_finish_command(trans)) == NULL)
+    hcc_leaf_int32(trans, LC_VERSION, HCPROTO_VERSION);
+    if ((head = hcc_finish_command(trans)) == NULL) {
+	fprintf(stderr, "Connected to %s but remote failed to complete hello\n",
+		hc->host);
 	return(-1);
+    }
 
     if (head->error) {
 	fprintf(stderr, "Connected to %s but remote returned error %d\n",
@@ -125,6 +132,11 @@
 	    break;
 	}
     }
+    if (hc->version < HCPROTO_VERSION_COMPAT) {
+	fprintf(stderr, "Remote cpdup at %s has an incompatible version\n",
+		hc->host);
+	error = -1;
+    }
     if (error < 0)
 	fprintf(stderr, "Handshake failed with %s\n", hc->host);
     return (error);
@@ -142,7 +154,7 @@
 	hostbuf[0] = '?';
 
     hcc_leaf_string(trans, LC_HELLOSTR, hostbuf);
-    hcc_leaf_int32(trans, LC_VERSION, 1);
+    hcc_leaf_int32(trans, LC_VERSION, HCPROTO_VERSION);
     return(0);
 }
 
@@ -492,6 +504,8 @@
 	switch(item->leafid) {
 	case LC_DESCRIPTOR:
 	    dir = hcc_get_descriptor(trans->hc, HCC_INT32(item), HC_DESC_DIR);
+	    if (dir != NULL)
+		    hcc_set_descriptor(trans->hc, HCC_INT32(item), NULL, HC_DESC_DIR);
 	    break;
 	}
     }
@@ -660,6 +674,22 @@
     return(close(fd));
 }
 
+static int
+getiolimit(void)
+{
+#if USE_PTHREADS
+    if (CurParallel < 2)
+	return(32768);
+    if (CurParallel < 4)
+	return(16384);
+    if (CurParallel < 8)
+	return(8192);
+    return(4096);
+#else
+    return(32768);
+#endif
+}
+
 /*
  * READ
  */
@@ -679,7 +709,8 @@
     if (fdp) {
 	r = 0;
 	while (bytes) {
-	    int n = (bytes > 32768) ? 32768 : bytes;
+	    size_t limit = getiolimit();
+	    int n = (bytes > limit) ? limit : bytes;
 	    int x = 0;
 
 	    trans = hcc_start_command(hc, HC_READ);
@@ -762,7 +793,8 @@
     if (fdp) {
 	r = 0;
 	while (bytes) {
-	    int n = (bytes > 32768) ? 32768 : bytes;
+	    size_t limit = getiolimit();
+	    int n = (bytes > limit) ? limit : bytes;
 	    int x = 0;
 
 	    trans = hcc_start_command(hc, HC_WRITE);
@@ -1090,6 +1122,55 @@
 }
 
 /*
+ * MKNOD
+ */
+int
+hc_mknod(struct HostConf *hc, const char *path, mode_t mode, dev_t rdev)
+{
+    hctransaction_t trans;
+    struct HCHead *head;
+
+    if (hc == NULL || hc->host == NULL)
+	return(mknod(path, mode, rdev));
+
+    trans = hcc_start_command(hc, HC_MKNOD);
+    hcc_leaf_string(trans, LC_PATH1, path);
+    hcc_leaf_int32(trans, LC_MODE, mode);
+    hcc_leaf_int32(trans, LC_RDEV, rdev);
+    if ((head = hcc_finish_command(trans)) == NULL)
+	return(-1);
+    if (head->error)
+	return(-1);
+    return(0);
+}
+
+static int
+rc_mknod(hctransaction_t trans __unused, struct HCHead *head)
+{
+    struct HCLeaf *item;
+    const char *path = NULL;
+    mode_t mode = 0666;
+    dev_t rdev = 0;
+
+    for (item = hcc_firstitem(head); item; item = hcc_nextitem(head, item)) {
+	switch(item->leafid) {
+	case LC_PATH1:
+	    path = HCC_STRING(item);
+	    break;
+	case LC_MODE:
+	    mode = HCC_INT32(item);
+	    break;
+	case LC_RDEV:
+	    rdev = HCC_INT32(item);
+	    break;
+	}
+    }
+    if (path == NULL)
+	return(-1);
+    return(mknod(path, mode, rdev));
+}
+
+/*
  * LINK
  */
 int
Index: hclink.h
===================================================================
RCS file: /home/cvs/src/bin/cpdup/hclink.h,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/hclink.h -L bin/cpdup/hclink.h -u -r1.2 -r1.3
--- bin/cpdup/hclink.h
+++ bin/cpdup/hclink.h
@@ -22,9 +22,11 @@
     struct HostConf *hc;
     u_int16_t	id;		/* assigned transaction id */
     int		windex;		/* output buffer index */
-    enum { HCT_IDLE, HCT_SENT, HCT_REPLIED } state;
+    enum { HCT_IDLE, HCT_SENT, HCT_REPLIED, HCT_DONE } state;
 #if USE_PTHREADS
     pthread_t	tid;
+    pthread_cond_t cond;
+    int		waiting;
 #endif
     char	rbuf[65536];	/* input buffer */
     char	wbuf[65536];	/* output buffer */
@@ -44,8 +46,9 @@
     int		version;	/* cpdup protocol version */
     struct HCHostDesc *hostdescs;
 #if USE_PTHREADS
-    pthread_mutex_t read_mutex;
+    pthread_mutex_t hct_mutex[HCTHASH_SIZE];
     hctransaction_t hct_hash[HCTHASH_SIZE];
+    pthread_t	reader_thread;
 #else
     struct HCTransaction trans;
 #endif
@@ -113,6 +116,7 @@
 struct HCLeaf *hcc_nextitem(struct HCHead *head, struct HCLeaf *item);
 
 void hcc_debug_dump(struct HCHead *head);
+void hcc_free_trans(struct HostConf *hc);
 
 #endif
 
Index: hcproto.h
===================================================================
RCS file: /home/cvs/src/bin/cpdup/hcproto.h,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/hcproto.h -L bin/cpdup/hcproto.h -u -r1.2 -r1.3
--- bin/cpdup/hcproto.h
+++ bin/cpdup/hcproto.h
@@ -8,6 +8,9 @@
 #ifndef _HCPROTO_H_
 #define _HCPROTO_H_
 
+#define HCPROTO_VERSION               2
+#define HCPROTO_VERSION_COMPAT        2
+
 #define HC_HELLO	0x0001
 
 #define HC_STAT		0x0010
@@ -32,6 +35,7 @@
 #define HC_SYMLINK	0x0023
 #define HC_RENAME	0x0024
 #define HC_UTIMES	0x0025
+#define HC_MKNOD	0x0026
 
 #define LC_HELLOSTR	(0x0001|LCF_STRING)
 #define LC_PATH1	(0x0010|LCF_STRING)
@@ -87,6 +91,7 @@
 int hc_chown(struct HostConf *hc, const char *path, uid_t owner, gid_t group);
 int hc_lchown(struct HostConf *hc, const char *path, uid_t owner, gid_t group);
 int hc_chmod(struct HostConf *hc, const char *path, mode_t mode);
+int hc_mknod(struct HostConf *hc, const char *path, mode_t mode, dev_t rdev);
 int hc_link(struct HostConf *hc, const char *name1, const char *name2);
 int hc_chflags(struct HostConf *hc, const char *path, u_long flags);
 int hc_readlink(struct HostConf *hc, const char *path, char *buf, int bufsiz);
Index: cpdup.1
===================================================================
RCS file: /home/cvs/src/bin/cpdup/cpdup.1,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/cpdup.1 -L bin/cpdup/cpdup.1 -u -r1.2 -r1.3
--- bin/cpdup/cpdup.1
+++ bin/cpdup/cpdup.1
@@ -4,7 +4,7 @@
 .\"
 .\" $MidnightBSD$
 .\" $DragonFly: src/bin/cpdup/cpdup.1,v 1.28 2008/04/10 22:09:08 dillon Exp $
-.Dd March 22, 2008
+.Dd August 16, 2008
 .Dt CPDUP 1
 .Os
 .Sh NAME
@@ -12,6 +12,7 @@
 .Nd mirror filesystems
 .Sh SYNOPSIS
 .Nm
+.Op Fl C
 .Op Fl v[vv..]
 .Op Fl u
 .Op Fl I
@@ -19,7 +20,8 @@
 .Op Fl s0
 .Op Fl i0
 .Op Fl j0
-.Op Fl p<num>
+.Op Fl l
+.Op Fl p Ar number
 .Op Fl q
 .Op Fl o
 .Op Fl m
@@ -64,6 +66,10 @@
 .Pp
 The following options are available:
 .Bl -tag -width flag
+.It Fl C
+If the source or target is a remote host request that the
+.Xr ssh 1
+session be compressed.
 .It Fl v[vv]
 Set verboseness.  By default
 .Nm
@@ -102,8 +108,10 @@
 Do not try to recreate CHR or BLK devices.
 .It Fl l
 Line buffer verbose output.
-.It Fl p<number>
-Use threaded transactions with up to the specified number of threads.
+.It Fl p Ar number
+Use threaded transactions with up to the specified
+.Ar number
+of threads.
 This typically improves operation when a remote host specification is
 given.
 .It Fl q
Index: misc.c
===================================================================
RCS file: /home/cvs/src/bin/cpdup/misc.c,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/misc.c -L bin/cpdup/misc.c -u -r1.2 -r1.3
--- bin/cpdup/misc.c
+++ bin/cpdup/misc.c
@@ -85,6 +85,68 @@
     return(s);
 }
 
+#ifdef DEBUG_MALLOC
+
+#undef malloc
+#undef free
+
+struct malloc_info {
+	struct malloc_info *next;
+	struct malloc_info *prev;
+	const char *file;
+	int magic;
+	int line;
+};
+
+struct malloc_info DummyInfo = { &DummyInfo, &DummyInfo, NULL, 0, 0 };
+struct malloc_info *InfoList = &DummyInfo;
+
+void *
+debug_malloc(size_t bytes, const char *file, int line)
+{
+	struct malloc_info *info = malloc(sizeof(*info) + bytes);
+
+	info->magic = 0x5513A4C2;
+	info->file = file;
+	info->line = line;
+
+	info->next = InfoList;
+	info->prev = InfoList->prev;
+	info->next->prev = info;
+	info->prev->next = info;
+	return(info + 1);
+}
+
+void
+debug_free(void *ptr)
+{
+	struct malloc_info *info = (struct malloc_info *)ptr - 1;
+	struct malloc_info *scan;
+	static int report;
+
+	for (scan = DummyInfo.next; scan != &DummyInfo; scan = scan->next) {
+		if (info == scan) {
+			assert(info->magic == 0x5513A4C2);
+			info->magic = 0;
+			info->next->prev = info->prev;
+			info->prev->next = info->next;
+			free(info);
+			break;
+		}
+	}
+	if (scan == &DummyInfo)
+		free(ptr);
+
+	if ((++report & 65535) == 0) {
+		printf("--- report\n");
+		for (scan = DummyInfo.next; scan != &DummyInfo; scan = scan->next) {
+			printf("%-15s %d\n", scan->file, scan->line);
+		}
+	}
+}
+
+#endif
+
 void
 fatal(const char *ctl, ...)
 {
@@ -92,7 +154,8 @@
 
     if (ctl == NULL) {
 	puts("cpdup [<options>] src [dest]");
-	puts("    -v[vv]      verbose level (-vv is typical)\n"
+	puts("    -C          request compressed ssh link if remote operation\n"
+	     "    -v[vv]      verbose level (-vv is typical)\n"
 	     "    -u          use unbuffered output for -v[vv]\n"
 	     "    -I          display performance summary\n"
 	     "    -f          force update even if files look the same\n"
@@ -118,7 +181,7 @@
 #endif
 	     "    -x          use .cpignore as exclusion file\n"
 	     "    -X file     specify exclusion file\n"
-	     " Version 1.09 by Matt Dillon and Dima Ruban\n"
+	     " Version 1.11 by Matt Dillon and Dima Ruban\n"
 	);
 	exit(0);
     } else {
Index: cpdup.h
===================================================================
RCS file: /home/cvs/src/bin/cpdup/cpdup.h,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/cpdup.h -L bin/cpdup/cpdup.h -u -r1.2 -r1.3
--- bin/cpdup/cpdup.h
+++ bin/cpdup/cpdup.h
@@ -47,6 +47,8 @@
 extern const char *FSMIDCacheFile;
 
 extern int SummaryOpt;
+extern int CompressOpt;
+extern int CurParallel;
 
 extern int64_t CountSourceBytes;
 extern int64_t CountSourceItems;
@@ -59,3 +61,11 @@
 #if USE_PTHREADS
 extern pthread_mutex_t MasterMutex;
 #endif
+
+#ifdef DEBUG_MALLOC
+void *debug_malloc(size_t bytes, const char *file, int line);
+void debug_free(void *ptr);
+
+#define malloc(bytes)	debug_malloc(bytes, __FILE__, __LINE__)
+#define free(ptr)	debug_free(ptr)
+#endif
Index: hclink.c
===================================================================
RCS file: /home/cvs/src/bin/cpdup/hclink.c,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/hclink.c -L bin/cpdup/hclink.c -u -r1.2 -r1.3
--- bin/cpdup/hclink.c
+++ bin/cpdup/hclink.c
@@ -11,7 +11,10 @@
 #include "hclink.h"
 #include "hcproto.h"
 
-static struct HCHead *hcc_read_command(hctransaction_t trans, int anypkt);
+#if USE_PTHREADS
+static void * hcc_reader_thread(void *arg);
+#endif
+static struct HCHead *hcc_read_command(struct HostConf *hc, hctransaction_t trans);
 static void hcc_start_reply(hctransaction_t trans, struct HCHead *rhead);
 
 int
@@ -19,6 +22,7 @@
 {
     int fdin[2];
     int fdout[2];
+    const char *av[16];
 
     if (hc == NULL || hc->host == NULL)
 	return(0);
@@ -34,13 +38,26 @@
 	/*
 	 * Child process
 	 */
+	int n;
+
 	dup2(fdin[1], 1);
 	close(fdin[0]);
 	close(fdin[1]);
 	dup2(fdout[0], 0);
 	close(fdout[0]);
 	close(fdout[1]);
-	execl("/usr/bin/ssh", "ssh", "-T", hc->host, "cpdup", "-S", (char *) NULL);
+
+	n = 0;
+	av[n++] = "ssh";
+	if (CompressOpt)
+	    av[n++] = "-C";
+	av[n++] = "-T";
+	av[n++] = hc->host;
+	av[n++] = "cpdup";
+	av[n++] = "-S";
+	av[n++] = NULL;
+
+	execv("/usr/bin/ssh", (void *)av);
 	_exit(1);
     } else if (hc->pid < 0) {
 	return(-1);
@@ -53,6 +70,9 @@
 	hc->fdin = fdin[0];
 	close(fdout[0]);
 	hc->fdout = fdout[1];
+#if USE_PTHREADS
+	pthread_create(&hc->reader_thread, NULL, hcc_reader_thread, hc);
+#endif
 	return(0);
     }
 }
@@ -91,6 +111,9 @@
     hcslave.fdout = fdout;
     trans.hc = &hcslave;
 
+#if USE_PTHREADS
+    pthread_mutex_unlock(&MasterMutex);
+#endif
     /*
      * Process commands on fdin and write out results on fdout
      */
@@ -98,7 +121,7 @@
 	/*
 	 * Get the command
 	 */
-	head = hcc_read_command(&trans, 1);
+	head = hcc_read_command(trans.hc, &trans);
 	if (head == NULL)
 	    break;
 
@@ -140,90 +163,121 @@
     return(0);
 }
 
+#if USE_PTHREADS
+/*
+ * This thread collects responses from the link.  It is run without
+ * the MasterMutex.
+ */
+static void *
+hcc_reader_thread(void *arg)
+{
+    struct HostConf *hc = arg;
+    struct HCHead *rhead;
+    hctransaction_t scan;
+    int i;
+
+    pthread_detach(pthread_self());
+    while (hcc_read_command(hc, NULL) != NULL)
+	;
+    hc->reader_thread = NULL;
+
+    /*
+     * Clean up any threads stuck waiting for a reply.
+     */
+    pthread_mutex_lock(&MasterMutex);
+    for (i = 0; i < HCTHASH_SIZE; ++i) {
+	pthread_mutex_lock(&hc->hct_mutex[i]);
+	for (scan = hc->hct_hash[i]; scan; scan = scan->next) {
+	    if (scan->state == HCT_SENT) {
+		scan->state = HCT_REPLIED;
+		rhead = (void *)scan->rbuf;
+		rhead->error = ENOTCONN;
+		if (scan->waiting)
+		    pthread_cond_signal(&scan->cond);
+	    }
+	}
+	pthread_mutex_unlock(&hc->hct_mutex[i]);
+    }
+    pthread_mutex_unlock(&MasterMutex);
+    return(NULL);
+}
+
+#endif
+
 /*
  * This reads a command from fdin, fixes up the byte ordering, and returns
  * a pointer to HCHead.
  *
- * If id is -1 we do not match response id's
+ * The MasterMutex may or may not be held.  When threaded this command
+ * is serialized by a reader thread.
  */
 static
 struct HCHead *
-hcc_read_command(hctransaction_t trans, int anypkt)
+hcc_read_command(struct HostConf *hc, hctransaction_t trans)
 {
-    struct HostConf *hc = trans->hc;
     hctransaction_t fill;
     struct HCHead tmp;
     int aligned_bytes;
     int n;
     int r;
 
-#if USE_PTHREADS
-    /*
-     * Shortcut if the reply has already been read in by another thread.
-     */
-    if (trans->state == HCT_REPLIED) {
-	return((void *)trans->rbuf);
+    n = 0;
+    while (n < (int)sizeof(struct HCHead)) {
+	r = read(hc->fdin, (char *)&tmp + n, sizeof(struct HCHead) - n);
+	if (r <= 0)
+	    goto fail;
+	n += r;
     }
-    pthread_mutex_unlock(&MasterMutex);
-    pthread_mutex_lock(&hc->read_mutex);
-#endif
-    while (trans->state != HCT_REPLIED) {
-	n = 0;
-	while (n < (int)sizeof(struct HCHead)) {
-	    r = read(hc->fdin, (char *)&tmp + n, sizeof(struct HCHead) - n);
-	    if (r <= 0)
-		goto fail;
-	    n += r;
-	}
 
-	assert(tmp.bytes >= (int)sizeof(tmp) && tmp.bytes < 65536);
-	assert(tmp.magic == HCMAGIC);
+    assert(tmp.bytes >= (int)sizeof(tmp) && tmp.bytes < 65536);
+    assert(tmp.magic == HCMAGIC);
 
-	if (anypkt == 0 && tmp.id != trans->id && trans->hc->version > 0) {
+    if (trans) {
+	fill = trans;
+    } else {
 #if USE_PTHREADS
-	    for (fill = trans->hc->hct_hash[tmp.id & HCTHASH_MASK];
-		 fill;
-		 fill = fill->next)
-	    {
-		if (fill->state == HCT_SENT && fill->id == tmp.id)
-			break;
-	    }
-	    if (fill == NULL)
+	pthread_mutex_lock(&hc->hct_mutex[tmp.id & HCTHASH_MASK]);
+	for (fill = hc->hct_hash[tmp.id & HCTHASH_MASK];
+	     fill;
+	     fill = fill->next)
+	{
+	    if (fill->state == HCT_SENT && fill->id == tmp.id)
+		    break;
+	}
+	pthread_mutex_unlock(&hc->hct_mutex[tmp.id & HCTHASH_MASK]);
+	if (fill == NULL)
 #endif
-	    {
-		fprintf(stderr, 
-			"cpdup hlink protocol error with %s (%04x %04x)\n",
-			trans->hc->host, trans->id, tmp.id);
-		exit(1);
-	    }
-	} else {
-	    fill = trans;
+	{
+	    fprintf(stderr, 
+		    "cpdup hlink protocol error with %s (%04x %04x)\n",
+		    hc->host, trans->id, tmp.id);
+	    exit(1);
 	}
+    }
 
-	bcopy(&tmp, fill->rbuf, n);
-	aligned_bytes = HCC_ALIGN(tmp.bytes);
+    bcopy(&tmp, fill->rbuf, n);
+    aligned_bytes = HCC_ALIGN(tmp.bytes);
 
-	while (n < aligned_bytes) {
-	    r = read(hc->fdin, fill->rbuf + n, aligned_bytes - n);
-	    if (r <= 0)
-		goto fail;
-	    n += r;
-	}
+    while (n < aligned_bytes) {
+	r = read(hc->fdin, fill->rbuf + n, aligned_bytes - n);
+	if (r <= 0)
+	    goto fail;
+	n += r;
+    }
 #ifdef DEBUG
-	hcc_debug_dump(head);
+    hcc_debug_dump(head);
 #endif
-	fill->state = HCT_REPLIED;
-    }
 #if USE_PTHREADS
-    pthread_mutex_lock(&MasterMutex);
-    pthread_mutex_unlock(&hc->read_mutex);
+    pthread_mutex_lock(&hc->hct_mutex[fill->id & HCTHASH_MASK]);
 #endif
-    return((void *)trans->rbuf);
-fail:
+    fill->state = HCT_REPLIED;
 #if USE_PTHREADS
-    pthread_mutex_lock(&MasterMutex);
-    pthread_mutex_unlock(&hc->read_mutex);
+    if (fill->waiting)
+	pthread_cond_signal(&fill->cond);
+    pthread_mutex_unlock(&hc->hct_mutex[fill->id & HCTHASH_MASK]);
 #endif
+    return((void *)fill->rbuf);
+fail:
     return(NULL);
 }
 
@@ -240,6 +294,7 @@
 
     i = ((intptr_t)tid >> 7) & HCTHASH_MASK;
 
+    pthread_mutex_lock(&hc->hct_mutex[i]);
     for (trans = hc->hct_hash[i]; trans; trans = trans->next) {
 	if (trans->tid == tid)
 		break;
@@ -249,6 +304,7 @@
 	bzero(trans, sizeof(*trans));
 	trans->tid = tid;
 	trans->id = i;
+	pthread_cond_init(&trans->cond, NULL);
 	do {
 		for (scan = hc->hct_hash[i]; scan; scan = scan->next) {
 			if (scan->id == trans->id) {
@@ -261,9 +317,33 @@
 	trans->next = hc->hct_hash[i];
 	hc->hct_hash[i] = trans;
     }
+    pthread_mutex_unlock(&hc->hct_mutex[i]);
     return(trans);
 }
 
+void
+hcc_free_trans(struct HostConf *hc)
+{
+    hctransaction_t trans;
+    hctransaction_t *transp;
+    pthread_t tid = pthread_self();
+    int i;
+
+    i = ((intptr_t)tid >> 7) & HCTHASH_MASK;
+
+    pthread_mutex_lock(&hc->hct_mutex[i]);
+    for (transp = &hc->hct_hash[i]; *transp; transp = &trans->next) {
+	trans = *transp;
+	if (trans->tid == tid) {
+		*transp = trans->next;
+		pthread_cond_destroy(&trans->cond);
+		free(trans);
+		break;
+	}
+    }
+    pthread_mutex_unlock(&hc->hct_mutex[i]);
+}
+
 #else
 
 static
@@ -273,6 +353,12 @@
     return(&hc->trans);
 }
 
+void
+hcc_free_trans(struct HostConf *hc __unused)
+{
+    /* nop */
+}
+
 #endif
 
 /*
@@ -321,18 +407,20 @@
 struct HCHead *
 hcc_finish_command(hctransaction_t trans)
 {
+    struct HostConf *hc;
     struct HCHead *whead;
     struct HCHead *rhead;
     int aligned_bytes;
     int16_t wcmd;
 
+    hc = trans->hc;
     whead = (void *)trans->wbuf;
     whead->bytes = trans->windex;
     aligned_bytes = HCC_ALIGN(trans->windex);
 
     trans->state = HCT_SENT;
 
-    if (write(trans->hc->fdout, whead, aligned_bytes) != aligned_bytes) {
+    if (write(hc->fdout, whead, aligned_bytes) != aligned_bytes) {
 #ifdef __error
 	*__error = EIO;
 #else
@@ -340,7 +428,7 @@
 #endif
 	if (whead->cmd < 0x0010)
 		return(NULL);
-	fprintf(stderr, "cpdup lost connection to %s\n", trans->hc->host);
+	fprintf(stderr, "cpdup lost connection to %s\n", hc->host);
 	exit(1);
     }
 
@@ -350,7 +438,23 @@
      * whead is invalid when we call hcc_read_command() because
      * we may switch to another thread.
      */
-    if ((rhead = hcc_read_command(trans, 0)) == NULL) {
+#if USE_PTHREADS
+    pthread_mutex_unlock(&MasterMutex);
+    while (trans->state != HCT_REPLIED && hc->reader_thread) {
+	pthread_mutex_t *mtxp = &hc->hct_mutex[trans->id & HCTHASH_MASK];
+	pthread_mutex_lock(mtxp);
+	trans->waiting = 1;
+	if (trans->state != HCT_REPLIED && hc->reader_thread)
+		pthread_cond_wait(&trans->cond, mtxp);
+	trans->waiting = 0;
+	pthread_mutex_unlock(mtxp);
+    }
+    pthread_mutex_lock(&MasterMutex);
+    rhead = (void *)trans->rbuf;
+#else
+    rhead = hcc_read_command(hc, trans);
+#endif
+    if (trans->state != HCT_REPLIED || rhead->id != trans->id) {
 #ifdef __error
 	*__error = EIO;
 #else
@@ -358,9 +462,10 @@
 #endif
 	if (wcmd < 0x0010)
 		return(NULL);
-	fprintf(stderr, "cpdup lost connection to %s\n", trans->hc->host);
+	fprintf(stderr, "cpdup lost connection to %s\n", hc->host);
 	exit(1);
     }
+    trans->state = HCT_DONE;
 
     if (rhead->error) {
 #ifdef __error
Index: PORTING
===================================================================
RCS file: /home/cvs/src/bin/cpdup/PORTING,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/PORTING -L bin/cpdup/PORTING -u -r1.2 -r1.3
--- bin/cpdup/PORTING
+++ bin/cpdup/PORTING
@@ -1,5 +1,5 @@
 $MIdnightBSD$
-$DragonFly: src/bin/cpdup/PORTING,v 1.1 2006/09/16 21:57:08 dillon Exp $
+$DragonFly: src/bin/cpdup/PORTING,v 1.2 2008/04/11 17:33:11 dillon Exp $
 
 				PORTING CPDUP
 
@@ -24,8 +24,10 @@
     cd cpdup
     rm -f md5.c
     rm -f *.o
-    cc -c -D__unused= -D_GNU_SOURCE -D__USE_FILE_OFFSET64 -DNOMD5 *.c
-    cc -D__unused= -D_GNU_SOURCE -D__USE_FILE_OFFSET64 -DNOMD5 *.o -o ~/bin/cpdup
+    cc -D__unused= -D_GNU_SOURCE -D__USE_FILE_OFFSET64 -DNOMD5 \
+	-DUSE_PTHREADS -pthread *.c -c
+    cc -D__unused= -D_GNU_SOURCE -D__USE_FILE_OFFSET64 -DNOMD5 \
+	-DUSE_PTHREADS -pthread *.o -o ~/bin/cpdup
 
     BACKUP SCRIPT MODIFICATIONS - you will almost certainly have to adjust
     the do_cleanup script to extract the proper field(s) from the df output.
Index: cpdup.c
===================================================================
RCS file: /home/cvs/src/bin/cpdup/cpdup.c,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/cpdup.c -L bin/cpdup/cpdup.c -u -r1.2 -r1.3
--- bin/cpdup/cpdup.c
+++ bin/cpdup/cpdup.c
@@ -65,6 +65,12 @@
 #define HLSIZE	8192
 #define HLMASK	(HLSIZE - 1)
 
+#define MAXDEPTH	32	/* max copy depth for thread */
+#define GETBUFSIZE	8192
+#define GETPATHSIZE	2048
+#define GETLINKSIZE	1024
+#define GETIOSIZE	65536
+
 #ifndef _ST_FLAGS_PRESENT_
 #define st_flags	st_mode
 #endif
@@ -84,6 +90,7 @@
 struct hlink {
     ino_t ino;
     ino_t dino;
+    int	refs;
     struct hlink *next;
     struct hlink *prev;
     nlink_t nlinked;
@@ -115,11 +122,12 @@
 static int validate_check(const char *spath, const char *dpath);
 static int shash(const char *s);
 static void hltdelete(struct hlink *);
+static void hltsetdino(struct hlink *, ino_t);
 int YesNo(const char *path);
 static int xrename(const char *src, const char *dst, u_long flags);
 static int xlink(const char *src, const char *dst, u_long flags);
 int WildCmp(const char *s1, const char *s2);
-static int DoCopy(copy_info_t info);
+static int DoCopy(copy_info_t info, int depth);
 
 int AskConfirmation = 1;
 int SafetyOpt = 1;
@@ -131,14 +139,14 @@
 int UseMD5Opt;
 int UseFSMIDOpt;
 int SummaryOpt;
+int CompressOpt;
 int SlaveOpt;
 int EnableDirectoryRetries;
 int DstBaseLen;
 int ValidateOpt;
 int CurParallel;
 int MaxParallel = -1;
-char IOBuf1[65536];
-char IOBuf2[65536];
+int HardLinkCount;
 const char *UseCpFile;
 const char *UseHLPath;
 const char *MD5CacheFile;
@@ -173,8 +181,10 @@
     signal(SIGPIPE, SIG_IGN);
 
 #if USE_PTHREADS
-    pthread_mutex_init(&SrcHost.read_mutex, NULL);
-    pthread_mutex_init(&DstHost.read_mutex, NULL);
+    for (i = 0; i < HCTHASH_SIZE; ++i) {
+	pthread_mutex_init(&SrcHost.hct_mutex[i], NULL);
+	pthread_mutex_init(&DstHost.hct_mutex[i], NULL);
+    }
     pthread_mutex_init(&MasterMutex, NULL);
     pthread_mutex_lock(&MasterMutex);
 #endif
@@ -201,6 +211,9 @@
 	    v = strtol(ptr, NULL, 0);
 
 	switch(ptr[-1]) {
+	case 'C':
+	    CompressOpt = 1;
+	    break;
 	case 'v':
 	    VerboseOpt = 1;
 	    while (*ptr == 'v') {
@@ -303,7 +316,7 @@
 	    exit(1);
 	}
 	if (hc_connect(&SrcHost) < 0)
-	    fprintf(stderr, "Unable to connect to %s\n", SrcHost.host);
+	    exit(1);
     }
     if (dst && (ptr = strchr(dst, ':')) != NULL) {
 	asprintf(&DstHost.host, "%*.*s", ptr - dst, ptr - dst, dst);
@@ -313,7 +326,7 @@
 	    exit(1);
 	}
 	if (hc_connect(&DstHost) < 0)
-	    fprintf(stderr, "Unable to connect to %s\n", DstHost.host);
+	    exit(1);
     }
 
     /*
@@ -324,6 +337,7 @@
 	fatal(NULL);
 	/* not reached */
     }
+    bzero(&info, sizeof(info));
 #if USE_PTHREADS
     info.r = 0;
     info.children = 0;
@@ -335,13 +349,13 @@
 	info.dpath = dst;
 	info.sdevNo = (dev_t)-1;
 	info.ddevNo = (dev_t)-1;
-	i = DoCopy(&info);
+	i = DoCopy(&info, -1);
     } else {
 	info.spath = src;
 	info.dpath = NULL;
 	info.sdevNo = (dev_t)-1;
 	info.ddevNo = (dev_t)-1;
-	i = DoCopy(&info);
+	i = DoCopy(&info, -1);
     }
 #if USE_PTHREADS
     pthread_cond_destroy(&info.cond);
@@ -352,7 +366,7 @@
     fsmid_flush();
 
     if (SummaryOpt && i == 0) {
-	long duration;
+	double duration;
 	struct timeval end;
 
 	gettimeofday(&end, NULL);
@@ -364,10 +378,10 @@
 	CountWriteBytes +=  sizeof(struct stat) * CountRemovedItems;
 #endif
 
-	duration = end.tv_sec - start.tv_sec;
-	duration *= 1000000;
-	duration += end.tv_usec - start.tv_usec;
-	if (duration == 0) duration = 1;
+	duration = (end.tv_sec - start.tv_sec);
+	duration += (double)(end.tv_usec - start.tv_usec) / 1000000.0;
+	if (duration == 0.0)
+		duration = 1.0;
 	logstd("cpdup completed successfully\n");
 	logstd("%lld bytes source, %lld src bytes read, %lld tgt bytes read\n"
 	       "%lld bytes written (%.1fX speedup)\n",
@@ -383,9 +397,9 @@
 	    (long long)CountLinkedItems,
 	    (long long)CountRemovedItems);
 	logstd("%.1f seconds %5d Kbytes/sec synced %5d Kbytes/sec scanned\n",
-	    (float)duration / (float)1000000,
-	    (long)((long)1000000 * (CountSourceReadBytes + CountTargetReadBytes + CountWriteBytes) / duration  / 1024.0),
-	    (long)((long)1000000 * CountSourceBytes / duration / 1024.0));
+	    duration,
+	    (int)((CountSourceReadBytes + CountTargetReadBytes + CountWriteBytes) / duration  / 1024.0),
+	    (int)(CountSourceBytes / duration / 1024.0));
     }
     exit((i == 0) ? 0 : 1);
 }
@@ -393,14 +407,36 @@
 static struct hlink *
 hltlookup(struct stat *stp)
 {
+#if USE_PTHREADS
+    struct timespec ts = { 0, 100000 };
+#endif
     struct hlink *hl;
     int n;
 
     n = stp->st_ino & HLMASK;
 
-    for (hl = hltable[n]; hl; hl = hl->next)
-        if (hl->ino == stp->st_ino)
-              return hl;
+#if USE_PTHREADS
+again:
+#endif
+    for (hl = hltable[n]; hl; hl = hl->next) {
+        if (hl->ino == stp->st_ino) {
+#if USE_PTHREADS
+	    /*
+	     * If the hl entry is still in the process of being created
+	     * by another thread we have to wait until it has either been
+	     * deleted or completed.
+	     */
+	    if (hl->refs) {
+		pthread_mutex_unlock(&MasterMutex);
+		nanosleep(&ts, NULL);
+		pthread_mutex_lock(&MasterMutex);
+		goto again;
+	    }
+#endif
+	    ++hl->refs;
+	    return hl;
+	}
+    }
 
     return NULL;
 }
@@ -417,10 +453,12 @@
         fprintf(stderr, "out of memory\n");
         exit(EXIT_FAILURE);
     }
+    ++HardLinkCount;
 
     /* initialize and link the new element into the table */
     new->ino = stp->st_ino;
-    new->dino = 0;
+    new->dino = (ino_t)-1;
+    new->refs = 1;
     bcopy(path, new->name, plen + 1);
     new->nlinked = 1;
     new->prev = NULL;
@@ -434,8 +472,16 @@
 }
 
 static void
+hltsetdino(struct hlink *hl, ino_t inum)
+{
+    hl->dino = inum;
+}
+
+static void
 hltdelete(struct hlink *hl)
 {
+    assert(hl->refs == 1);
+    --hl->refs;
     if (hl->prev) {
         if (hl->next)
             hl->next->prev = hl->prev;
@@ -446,10 +492,17 @@
 
         hltable[hl->ino & HLMASK] = hl->next;
     }
-
+    --HardLinkCount;
     free(hl);
 }
 
+static void
+hltrels(struct hlink *hl)
+{
+    assert(hl->refs == 1);
+    --hl->refs;
+}
+
 /*
  * If UseHLPath is defined check to see if the file in question is
  * the same as the source file, and if it is return a pointer to the
@@ -508,17 +561,21 @@
     if (fd1 >= 0 && fd2 >= 0) {
 	int n;
 	int x;
+	char *iobuf1 = malloc(GETIOSIZE);
+	char *iobuf2 = malloc(GETIOSIZE);
 
-	while ((n = hc_read(&SrcHost, fd1, IOBuf1, sizeof(IOBuf1))) > 0) {
+	while ((n = hc_read(&SrcHost, fd1, iobuf1, GETIOSIZE)) > 0) {
 	    CountSourceReadBytes += n;
-	    x = hc_read(&DstHost, fd2, IOBuf2, sizeof(IOBuf2));
+	    x = hc_read(&DstHost, fd2, iobuf2, GETIOSIZE);
 	    if (x > 0)
 		    CountTargetReadBytes += x;
 	    if (x != n)
 		break;
-	    if (bcmp(IOBuf1, IOBuf2, n) != 0)
+	    if (bcmp(iobuf1, iobuf2, n) != 0)
 		break;
 	}
+	free(iobuf1);
+	free(iobuf2);
 	if (n == 0)
 	    error = 0;
     }
@@ -536,27 +593,32 @@
     copy_info_t cinfo = arg;
     char *spath = cinfo->spath;
     char *dpath = cinfo->dpath;
+    int r;
  
+    r = pthread_detach(pthread_self());
+    assert(r == 0);
     pthread_cond_init(&cinfo->cond, NULL);
     pthread_mutex_lock(&MasterMutex);
-    cinfo->r += DoCopy(cinfo);
+    cinfo->r += DoCopy(cinfo, 0);
     /* cinfo arguments invalid on return */
     --cinfo->parent->children;
     --CurParallel;
     pthread_cond_signal(&cinfo->parent->cond);
-    pthread_mutex_unlock(&MasterMutex);
     free(spath);
     if (dpath)
 	free(dpath);
     pthread_cond_destroy(&cinfo->cond);
     free(cinfo);
+    hcc_free_trans(&SrcHost);
+    hcc_free_trans(&DstHost);
+    pthread_mutex_unlock(&MasterMutex);
     return(NULL);
 }
 
 #endif
 
 int
-DoCopy(copy_info_t info)
+DoCopy(copy_info_t info, int depth)
 {
     const char *spath = info->spath;
     const char *dpath = info->dpath;
@@ -602,8 +664,10 @@
 		     */
 		    if (VerboseOpt >= 3)
 			logstd("%-32s nochange\n", (dpath) ? dpath : spath);
-                    if (hln->nlinked == st1.st_nlink)
+                    if (hln->nlinked == st1.st_nlink) {
                         hltdelete(hln);
+			hln = NULL;
+		    }
 		    CountSourceItems++;
 		    r = 0;
 		    goto done;
@@ -615,7 +679,9 @@
 			logerr("%-32s hardlink: unable to unlink: %s\n", 
 			    ((dpath) ? dpath : spath), strerror(errno));
                         hltdelete(hln);
-			return (r + 1);
+			hln = NULL;
+			++r;
+			goto done;
 		    }
                 }
             }
@@ -711,7 +777,7 @@
 		    validate_check(spath, dpath) == 0)
 	    ) {
                 if (hln)
-                    hln->dino = st2.st_ino;
+		    hltsetdino(hln, st2.st_ino);
 		if (VerboseOpt >= 3) {
 #ifndef NOMD5
 		    if (UseMD5Opt)
@@ -812,7 +878,7 @@
 
 	    if (UseCpFile) {
 		FILE *fi;
-		char buf[8192];
+		char *buf = malloc(GETBUFSIZE);
 		char *fpath;
 
 		if (UseCpFile[0] == '/') {
@@ -822,7 +888,7 @@
 		}
 		AddList(list, strrchr(fpath, '/') + 1, 1);
 		if ((fi = fopen(fpath, "r")) != NULL) {
-		    while (fgets(buf, sizeof(buf), fi) != NULL) {
+		    while (fgets(buf, GETBUFSIZE, fi) != NULL) {
 			int l = strlen(buf);
 			CountSourceReadBytes += l;
 			if (l && buf[l-1] == '\n')
@@ -833,6 +899,7 @@
 		    fclose(fi);
 		}
 		free(fpath);
+		free(buf);
 	    }
 
 	    /*
@@ -870,9 +937,9 @@
 		    ndpath = mprintf("%s/%s", dpath, den->d_name);
 
 #if USE_PTHREADS
-		if (CurParallel < MaxParallel) {
+		if (CurParallel < MaxParallel || depth > MAXDEPTH) {
 		    copy_info_t cinfo = malloc(sizeof(*cinfo));
-		    pthread_t dummy_thr = NULL;
+		    pthread_t dummy_thr;
 
 		    bzero(cinfo, sizeof(*cinfo));
 		    cinfo->spath = nspath;
@@ -890,10 +957,15 @@
 		    info->dpath = ndpath;
 		    info->sdevNo = sdevNo;
 		    info->ddevNo = ddevNo;
-		    r += DoCopy(info);
+		    if (depth < 0)
+			r += DoCopy(info, depth);
+		    else
+			r += DoCopy(info, depth + 1);
 		    free(nspath);
 		    if (ndpath)
 			free(ndpath);
+		    info->spath = NULL;
+		    info->dpath = NULL;
 		}
 	    }
 
@@ -992,7 +1064,7 @@
 	int fd1;
 	int fd2;
 
-	path = mprintf("%s.tmp", dpath);
+	path = mprintf("%s.tmp%d", dpath, (int)getpid());
 
 	/*
 	 * Handle check failure message.
@@ -1047,15 +1119,16 @@
 	    }
 	    if (fd2 >= 0) {
 		const char *op;
+		char *iobuf1 = malloc(GETIOSIZE);
 		int n;
 
 		/*
 		 * Matt: What about holes?
 		 */
 		op = "read";
-		while ((n = hc_read(&SrcHost, fd1, IOBuf1, sizeof(IOBuf1))) > 0) {
+		while ((n = hc_read(&SrcHost, fd1, iobuf1, GETIOSIZE)) > 0) {
 		    op = "write";
-		    if (hc_write(&DstHost, fd2, IOBuf1, n) != n)
+		    if (hc_write(&DstHost, fd2, iobuf1, n) != n)
 			break;
 		    op = "read";
 		}
@@ -1095,6 +1168,7 @@
 		    hc_remove(&DstHost, path);
 		    ++r;
 		}
+		free(iobuf1);
 	    } else {
 		logerr("%-32s create (uid %d, euid %d) failed: %s\n",
 		    (dpath ? dpath : spath), getuid(), geteuid(),
@@ -1114,21 +1188,23 @@
 	free(path);
 
         if (hln) {
-            if (!r && hc_stat(&DstHost, dpath, &st2) == 0)
-                hln->dino = st2.st_ino;
-            else
+            if (!r && hc_stat(&DstHost, dpath, &st2) == 0) {
+		hltsetdino(hln, st2.st_ino);
+	    } else {
                 hltdelete(hln);
+		hln = NULL;
+	    }
         }
     } else if (S_ISLNK(st1.st_mode)) {
-	char link1[1024];
-	char link2[1024];
-	char path[2048];
+	char *link1 = malloc(GETLINKSIZE);
+	char *link2 = malloc(GETLINKSIZE);
+	char *path = malloc(GETPATHSIZE);
 	int n1;
 	int n2;
 
-	snprintf(path, sizeof(path), "%s.tmp", dpath);
-	n1 = hc_readlink(&SrcHost, spath, link1, sizeof(link1) - 1);
-	n2 = hc_readlink(&DstHost, dpath, link2, sizeof(link2) - 1);
+	snprintf(path, GETPATHSIZE, "%s.tmp%d", dpath, (int)getpid());
+	n1 = hc_readlink(&SrcHost, spath, link1, GETLINKSIZE - 1);
+	n2 = hc_readlink(&DstHost, dpath, link2, GETLINKSIZE - 1);
 	if (n1 >= 0) {
 	    if (ForceOpt || n1 != n2 || bcmp(link1, link2, n1) != 0) {
 		hc_umask(&DstHost, ~st1.st_mode);
@@ -1170,8 +1246,11 @@
 	    r = 1;
 	    logerr("%-32s softlink-failed\n", (dpath ? dpath : spath));
 	}
+	free(link1);
+	free(link2);
+	free(path);
     } else if ((S_ISCHR(st1.st_mode) || S_ISBLK(st1.st_mode)) && DeviceOpt) {
-	char path[2048];
+	char *path = malloc(GETPATHSIZE);
 
 	if (ForceOpt ||
 	    st2Valid == 0 || 
@@ -1180,10 +1259,10 @@
 	    st1.st_uid != st2.st_uid ||
 	    st1.st_gid != st2.st_gid
 	) {
-	    snprintf(path, sizeof(path), "%s.tmp", dpath);
+	    snprintf(path, GETPATHSIZE, "%s.tmp%d", dpath, (int)getpid());
 
 	    hc_remove(&DstHost, path);
-	    if (mknod(path, st1.st_mode, st1.st_rdev) == 0) {
+	    if (hc_mknod(&DstHost, path, st1.st_mode, st1.st_rdev) == 0) {
 		hc_chmod(&DstHost, path, st1.st_mode);
 		hc_chown(&DstHost, path, st1.st_uid, st1.st_gid);
 		hc_remove(&DstHost, dpath);
@@ -1206,9 +1285,18 @@
 	    if (VerboseOpt >= 3)
 		logstd("%-32s nochange\n", (dpath ? dpath : spath));
 	}
+	free(path);
 	CountSourceItems++;
     }
 done:
+    if (hln) {
+	if (hln->dino == (ino_t)-1) {
+	    hltdelete(hln);
+	    /*hln = NULL; unneeded */
+	} else {
+	    hltrels(hln);
+	}
+    }
     ResetList(list);
     free(list);
     return (r);


More information about the Midnightbsd-cvs mailing list