[Midnightbsd-cvs] src: bin/cpdup: Bring in changes from DragonFly to MidnightBSD version
laffer1 at midnightbsd.org
laffer1 at midnightbsd.org
Sat Aug 16 23:04:22 EDT 2008
Log Message:
-----------
Bring in changes from DragonFly to MidnightBSD version of cpdup.
Update to 1.11 (previous version 1.09).
This includes some improvements to the multithreaded code as well as fixes for the protocol. As older versions are not compatible, it's important to run the same versions. This should work with DragonFly 2.0's cpdup.
Modified Files:
--------------
src/bin/cpdup:
PORTING (r1.2 -> r1.3)
cpdup.1 (r1.2 -> r1.3)
cpdup.c (r1.2 -> r1.3)
cpdup.h (r1.2 -> r1.3)
hclink.c (r1.2 -> r1.3)
hclink.h (r1.2 -> r1.3)
hcproto.c (r1.2 -> r1.3)
hcproto.h (r1.2 -> r1.3)
misc.c (r1.2 -> r1.3)
-------------- next part --------------
Index: hcproto.c
===================================================================
RCS file: /home/cvs/src/bin/cpdup/hcproto.c,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/hcproto.c -L bin/cpdup/hcproto.c -u -r1.2 -r1.3
--- bin/cpdup/hcproto.c
+++ bin/cpdup/hcproto.c
@@ -3,7 +3,7 @@
*
* This module implements a simple remote control protocol
*
- * $DragonFly: src/bin/cpdup/hcproto.c,v 1.2 2008/04/10 22:09:08 dillon Exp $
+ * $DragonFly: src/bin/cpdup/hcproto.c,v 1.6 2008/04/16 17:38:19 dillon Exp $
*/
#include "cpdup.h"
@@ -29,6 +29,7 @@
static int rc_chown(hctransaction_t trans, struct HCHead *);
static int rc_lchown(hctransaction_t trans, struct HCHead *);
static int rc_chmod(hctransaction_t trans, struct HCHead *);
+static int rc_mknod(hctransaction_t trans, struct HCHead *);
static int rc_link(hctransaction_t trans, struct HCHead *);
#ifdef _ST_FLAGS_PRESENT_
static int rc_chflags(hctransaction_t trans, struct HCHead *);
@@ -56,6 +57,7 @@
{ HC_CHOWN, rc_chown },
{ HC_LCHOWN, rc_lchown },
{ HC_CHMOD, rc_chmod },
+ { HC_MKNOD, rc_mknod },
{ HC_LINK, rc_link },
#ifdef _ST_FLAGS_PRESENT_
{ HC_CHFLAGS, rc_chflags },
@@ -70,8 +72,10 @@
int
hc_connect(struct HostConf *hc)
{
- if (hcc_connect(hc) < 0)
+ if (hcc_connect(hc) < 0) {
+ fprintf(stderr, "Unable to connect to %s\n", hc->host);
return(-1);
+ }
return(hc_hello(hc));
}
@@ -103,9 +107,12 @@
trans = hcc_start_command(hc, HC_HELLO);
hcc_leaf_string(trans, LC_HELLOSTR, hostbuf);
- hcc_leaf_int32(trans, LC_VERSION, 1);
- if ((head = hcc_finish_command(trans)) == NULL)
+ hcc_leaf_int32(trans, LC_VERSION, HCPROTO_VERSION);
+ if ((head = hcc_finish_command(trans)) == NULL) {
+ fprintf(stderr, "Connected to %s but remote failed to complete hello\n",
+ hc->host);
return(-1);
+ }
if (head->error) {
fprintf(stderr, "Connected to %s but remote returned error %d\n",
@@ -125,6 +132,11 @@
break;
}
}
+ if (hc->version < HCPROTO_VERSION_COMPAT) {
+ fprintf(stderr, "Remote cpdup at %s has an incompatible version\n",
+ hc->host);
+ error = -1;
+ }
if (error < 0)
fprintf(stderr, "Handshake failed with %s\n", hc->host);
return (error);
@@ -142,7 +154,7 @@
hostbuf[0] = '?';
hcc_leaf_string(trans, LC_HELLOSTR, hostbuf);
- hcc_leaf_int32(trans, LC_VERSION, 1);
+ hcc_leaf_int32(trans, LC_VERSION, HCPROTO_VERSION);
return(0);
}
@@ -492,6 +504,8 @@
switch(item->leafid) {
case LC_DESCRIPTOR:
dir = hcc_get_descriptor(trans->hc, HCC_INT32(item), HC_DESC_DIR);
+ if (dir != NULL)
+ hcc_set_descriptor(trans->hc, HCC_INT32(item), NULL, HC_DESC_DIR);
break;
}
}
@@ -660,6 +674,22 @@
return(close(fd));
}
+static int
+getiolimit(void)
+{
+#if USE_PTHREADS
+ if (CurParallel < 2)
+ return(32768);
+ if (CurParallel < 4)
+ return(16384);
+ if (CurParallel < 8)
+ return(8192);
+ return(4096);
+#else
+ return(32768);
+#endif
+}
+
/*
* READ
*/
@@ -679,7 +709,8 @@
if (fdp) {
r = 0;
while (bytes) {
- int n = (bytes > 32768) ? 32768 : bytes;
+ size_t limit = getiolimit();
+ int n = (bytes > limit) ? limit : bytes;
int x = 0;
trans = hcc_start_command(hc, HC_READ);
@@ -762,7 +793,8 @@
if (fdp) {
r = 0;
while (bytes) {
- int n = (bytes > 32768) ? 32768 : bytes;
+ size_t limit = getiolimit();
+ int n = (bytes > limit) ? limit : bytes;
int x = 0;
trans = hcc_start_command(hc, HC_WRITE);
@@ -1090,6 +1122,55 @@
}
/*
+ * MKNOD
+ */
+int
+hc_mknod(struct HostConf *hc, const char *path, mode_t mode, dev_t rdev)
+{
+ hctransaction_t trans;
+ struct HCHead *head;
+
+ if (hc == NULL || hc->host == NULL)
+ return(mknod(path, mode, rdev));
+
+ trans = hcc_start_command(hc, HC_MKNOD);
+ hcc_leaf_string(trans, LC_PATH1, path);
+ hcc_leaf_int32(trans, LC_MODE, mode);
+ hcc_leaf_int32(trans, LC_RDEV, rdev);
+ if ((head = hcc_finish_command(trans)) == NULL)
+ return(-1);
+ if (head->error)
+ return(-1);
+ return(0);
+}
+
+static int
+rc_mknod(hctransaction_t trans __unused, struct HCHead *head)
+{
+ struct HCLeaf *item;
+ const char *path = NULL;
+ mode_t mode = 0666;
+ dev_t rdev = 0;
+
+ for (item = hcc_firstitem(head); item; item = hcc_nextitem(head, item)) {
+ switch(item->leafid) {
+ case LC_PATH1:
+ path = HCC_STRING(item);
+ break;
+ case LC_MODE:
+ mode = HCC_INT32(item);
+ break;
+ case LC_RDEV:
+ rdev = HCC_INT32(item);
+ break;
+ }
+ }
+ if (path == NULL)
+ return(-1);
+ return(mknod(path, mode, rdev));
+}
+
+/*
* LINK
*/
int
Index: hclink.h
===================================================================
RCS file: /home/cvs/src/bin/cpdup/hclink.h,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/hclink.h -L bin/cpdup/hclink.h -u -r1.2 -r1.3
--- bin/cpdup/hclink.h
+++ bin/cpdup/hclink.h
@@ -22,9 +22,11 @@
struct HostConf *hc;
u_int16_t id; /* assigned transaction id */
int windex; /* output buffer index */
- enum { HCT_IDLE, HCT_SENT, HCT_REPLIED } state;
+ enum { HCT_IDLE, HCT_SENT, HCT_REPLIED, HCT_DONE } state;
#if USE_PTHREADS
pthread_t tid;
+ pthread_cond_t cond;
+ int waiting;
#endif
char rbuf[65536]; /* input buffer */
char wbuf[65536]; /* output buffer */
@@ -44,8 +46,9 @@
int version; /* cpdup protocol version */
struct HCHostDesc *hostdescs;
#if USE_PTHREADS
- pthread_mutex_t read_mutex;
+ pthread_mutex_t hct_mutex[HCTHASH_SIZE];
hctransaction_t hct_hash[HCTHASH_SIZE];
+ pthread_t reader_thread;
#else
struct HCTransaction trans;
#endif
@@ -113,6 +116,7 @@
struct HCLeaf *hcc_nextitem(struct HCHead *head, struct HCLeaf *item);
void hcc_debug_dump(struct HCHead *head);
+void hcc_free_trans(struct HostConf *hc);
#endif
Index: hcproto.h
===================================================================
RCS file: /home/cvs/src/bin/cpdup/hcproto.h,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/hcproto.h -L bin/cpdup/hcproto.h -u -r1.2 -r1.3
--- bin/cpdup/hcproto.h
+++ bin/cpdup/hcproto.h
@@ -8,6 +8,9 @@
#ifndef _HCPROTO_H_
#define _HCPROTO_H_
+#define HCPROTO_VERSION 2
+#define HCPROTO_VERSION_COMPAT 2
+
#define HC_HELLO 0x0001
#define HC_STAT 0x0010
@@ -32,6 +35,7 @@
#define HC_SYMLINK 0x0023
#define HC_RENAME 0x0024
#define HC_UTIMES 0x0025
+#define HC_MKNOD 0x0026
#define LC_HELLOSTR (0x0001|LCF_STRING)
#define LC_PATH1 (0x0010|LCF_STRING)
@@ -87,6 +91,7 @@
int hc_chown(struct HostConf *hc, const char *path, uid_t owner, gid_t group);
int hc_lchown(struct HostConf *hc, const char *path, uid_t owner, gid_t group);
int hc_chmod(struct HostConf *hc, const char *path, mode_t mode);
+int hc_mknod(struct HostConf *hc, const char *path, mode_t mode, dev_t rdev);
int hc_link(struct HostConf *hc, const char *name1, const char *name2);
int hc_chflags(struct HostConf *hc, const char *path, u_long flags);
int hc_readlink(struct HostConf *hc, const char *path, char *buf, int bufsiz);
Index: cpdup.1
===================================================================
RCS file: /home/cvs/src/bin/cpdup/cpdup.1,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/cpdup.1 -L bin/cpdup/cpdup.1 -u -r1.2 -r1.3
--- bin/cpdup/cpdup.1
+++ bin/cpdup/cpdup.1
@@ -4,7 +4,7 @@
.\"
.\" $MidnightBSD$
.\" $DragonFly: src/bin/cpdup/cpdup.1,v 1.28 2008/04/10 22:09:08 dillon Exp $
-.Dd March 22, 2008
+.Dd August 16, 2008
.Dt CPDUP 1
.Os
.Sh NAME
@@ -12,6 +12,7 @@
.Nd mirror filesystems
.Sh SYNOPSIS
.Nm
+.Op Fl C
.Op Fl v[vv..]
.Op Fl u
.Op Fl I
@@ -19,7 +20,8 @@
.Op Fl s0
.Op Fl i0
.Op Fl j0
-.Op Fl p<num>
+.Op Fl l
+.Op Fl p Ar number
.Op Fl q
.Op Fl o
.Op Fl m
@@ -64,6 +66,10 @@
.Pp
The following options are available:
.Bl -tag -width flag
+.It Fl C
+If the source or target is a remote host request that the
+.Xr ssh 1
+session be compressed.
.It Fl v[vv]
Set verboseness. By default
.Nm
@@ -102,8 +108,10 @@
Do not try to recreate CHR or BLK devices.
.It Fl l
Line buffer verbose output.
-.It Fl p<number>
-Use threaded transactions with up to the specified number of threads.
+.It Fl p Ar number
+Use threaded transactions with up to the specified
+.Ar number
+of threads.
This typically improves operation when a remote host specification is
given.
.It Fl q
Index: misc.c
===================================================================
RCS file: /home/cvs/src/bin/cpdup/misc.c,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/misc.c -L bin/cpdup/misc.c -u -r1.2 -r1.3
--- bin/cpdup/misc.c
+++ bin/cpdup/misc.c
@@ -85,6 +85,68 @@
return(s);
}
+#ifdef DEBUG_MALLOC
+
+#undef malloc
+#undef free
+
+struct malloc_info {
+ struct malloc_info *next;
+ struct malloc_info *prev;
+ const char *file;
+ int magic;
+ int line;
+};
+
+struct malloc_info DummyInfo = { &DummyInfo, &DummyInfo, NULL, 0, 0 };
+struct malloc_info *InfoList = &DummyInfo;
+
+void *
+debug_malloc(size_t bytes, const char *file, int line)
+{
+ struct malloc_info *info = malloc(sizeof(*info) + bytes);
+
+ info->magic = 0x5513A4C2;
+ info->file = file;
+ info->line = line;
+
+ info->next = InfoList;
+ info->prev = InfoList->prev;
+ info->next->prev = info;
+ info->prev->next = info;
+ return(info + 1);
+}
+
+void
+debug_free(void *ptr)
+{
+ struct malloc_info *info = (struct malloc_info *)ptr - 1;
+ struct malloc_info *scan;
+ static int report;
+
+ for (scan = DummyInfo.next; scan != &DummyInfo; scan = scan->next) {
+ if (info == scan) {
+ assert(info->magic == 0x5513A4C2);
+ info->magic = 0;
+ info->next->prev = info->prev;
+ info->prev->next = info->next;
+ free(info);
+ break;
+ }
+ }
+ if (scan == &DummyInfo)
+ free(ptr);
+
+ if ((++report & 65535) == 0) {
+ printf("--- report\n");
+ for (scan = DummyInfo.next; scan != &DummyInfo; scan = scan->next) {
+ printf("%-15s %d\n", scan->file, scan->line);
+ }
+ }
+}
+
+#endif
+
void
fatal(const char *ctl, ...)
{
@@ -92,7 +154,8 @@
if (ctl == NULL) {
puts("cpdup [<options>] src [dest]");
- puts(" -v[vv] verbose level (-vv is typical)\n"
+ puts(" -C request compressed ssh link if remote operation\n"
+ " -v[vv] verbose level (-vv is typical)\n"
" -u use unbuffered output for -v[vv]\n"
" -I display performance summary\n"
" -f force update even if files look the same\n"
@@ -118,7 +181,7 @@
#endif
" -x use .cpignore as exclusion file\n"
" -X file specify exclusion file\n"
- " Version 1.09 by Matt Dillon and Dima Ruban\n"
+ " Version 1.11 by Matt Dillon and Dima Ruban\n"
);
exit(0);
} else {
Index: cpdup.h
===================================================================
RCS file: /home/cvs/src/bin/cpdup/cpdup.h,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/cpdup.h -L bin/cpdup/cpdup.h -u -r1.2 -r1.3
--- bin/cpdup/cpdup.h
+++ bin/cpdup/cpdup.h
@@ -47,6 +47,8 @@
extern const char *FSMIDCacheFile;
extern int SummaryOpt;
+extern int CompressOpt;
+extern int CurParallel;
extern int64_t CountSourceBytes;
extern int64_t CountSourceItems;
@@ -59,3 +61,11 @@
#if USE_PTHREADS
extern pthread_mutex_t MasterMutex;
#endif
+
+#ifdef DEBUG_MALLOC
+void *debug_malloc(size_t bytes, const char *file, int line);
+void debug_free(void *ptr);
+
+#define malloc(bytes) debug_malloc(bytes, __FILE__, __LINE__)
+#define free(ptr) debug_free(ptr)
+#endif
Index: hclink.c
===================================================================
RCS file: /home/cvs/src/bin/cpdup/hclink.c,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/hclink.c -L bin/cpdup/hclink.c -u -r1.2 -r1.3
--- bin/cpdup/hclink.c
+++ bin/cpdup/hclink.c
@@ -11,7 +11,10 @@
#include "hclink.h"
#include "hcproto.h"
-static struct HCHead *hcc_read_command(hctransaction_t trans, int anypkt);
+#if USE_PTHREADS
+static void * hcc_reader_thread(void *arg);
+#endif
+static struct HCHead *hcc_read_command(struct HostConf *hc, hctransaction_t trans);
static void hcc_start_reply(hctransaction_t trans, struct HCHead *rhead);
int
@@ -19,6 +22,7 @@
{
int fdin[2];
int fdout[2];
+ const char *av[16];
if (hc == NULL || hc->host == NULL)
return(0);
@@ -34,13 +38,26 @@
/*
* Child process
*/
+ int n;
+
dup2(fdin[1], 1);
close(fdin[0]);
close(fdin[1]);
dup2(fdout[0], 0);
close(fdout[0]);
close(fdout[1]);
- execl("/usr/bin/ssh", "ssh", "-T", hc->host, "cpdup", "-S", (char *) NULL);
+
+ n = 0;
+ av[n++] = "ssh";
+ if (CompressOpt)
+ av[n++] = "-C";
+ av[n++] = "-T";
+ av[n++] = hc->host;
+ av[n++] = "cpdup";
+ av[n++] = "-S";
+ av[n++] = NULL;
+
+ execv("/usr/bin/ssh", (void *)av);
_exit(1);
} else if (hc->pid < 0) {
return(-1);
@@ -53,6 +70,9 @@
hc->fdin = fdin[0];
close(fdout[0]);
hc->fdout = fdout[1];
+#if USE_PTHREADS
+ pthread_create(&hc->reader_thread, NULL, hcc_reader_thread, hc);
+#endif
return(0);
}
}
@@ -91,6 +111,9 @@
hcslave.fdout = fdout;
trans.hc = &hcslave;
+#if USE_PTHREADS
+ pthread_mutex_unlock(&MasterMutex);
+#endif
/*
* Process commands on fdin and write out results on fdout
*/
@@ -98,7 +121,7 @@
/*
* Get the command
*/
- head = hcc_read_command(&trans, 1);
+ head = hcc_read_command(trans.hc, &trans);
if (head == NULL)
break;
@@ -140,90 +163,121 @@
return(0);
}
+#if USE_PTHREADS
+/*
+ * This thread collects responses from the link. It is run without
+ * the MasterMutex.
+ */
+static void *
+hcc_reader_thread(void *arg)
+{
+ struct HostConf *hc = arg;
+ struct HCHead *rhead;
+ hctransaction_t scan;
+ int i;
+
+ pthread_detach(pthread_self());
+ while (hcc_read_command(hc, NULL) != NULL)
+ ;
+ hc->reader_thread = NULL;
+
+ /*
+ * Clean up any threads stuck waiting for a reply.
+ */
+ pthread_mutex_lock(&MasterMutex);
+ for (i = 0; i < HCTHASH_SIZE; ++i) {
+ pthread_mutex_lock(&hc->hct_mutex[i]);
+ for (scan = hc->hct_hash[i]; scan; scan = scan->next) {
+ if (scan->state == HCT_SENT) {
+ scan->state = HCT_REPLIED;
+ rhead = (void *)scan->rbuf;
+ rhead->error = ENOTCONN;
+ if (scan->waiting)
+ pthread_cond_signal(&scan->cond);
+ }
+ }
+ pthread_mutex_unlock(&hc->hct_mutex[i]);
+ }
+ pthread_mutex_unlock(&MasterMutex);
+ return(NULL);
+}
+
+#endif
+
/*
* This reads a command from fdin, fixes up the byte ordering, and returns
* a pointer to HCHead.
*
- * If id is -1 we do not match response id's
+ * The MasterMutex may or may not be held. When threaded this command
+ * is serialized by a reader thread.
*/
static
struct HCHead *
-hcc_read_command(hctransaction_t trans, int anypkt)
+hcc_read_command(struct HostConf *hc, hctransaction_t trans)
{
- struct HostConf *hc = trans->hc;
hctransaction_t fill;
struct HCHead tmp;
int aligned_bytes;
int n;
int r;
-#if USE_PTHREADS
- /*
- * Shortcut if the reply has already been read in by another thread.
- */
- if (trans->state == HCT_REPLIED) {
- return((void *)trans->rbuf);
+ n = 0;
+ while (n < (int)sizeof(struct HCHead)) {
+ r = read(hc->fdin, (char *)&tmp + n, sizeof(struct HCHead) - n);
+ if (r <= 0)
+ goto fail;
+ n += r;
}
- pthread_mutex_unlock(&MasterMutex);
- pthread_mutex_lock(&hc->read_mutex);
-#endif
- while (trans->state != HCT_REPLIED) {
- n = 0;
- while (n < (int)sizeof(struct HCHead)) {
- r = read(hc->fdin, (char *)&tmp + n, sizeof(struct HCHead) - n);
- if (r <= 0)
- goto fail;
- n += r;
- }
- assert(tmp.bytes >= (int)sizeof(tmp) && tmp.bytes < 65536);
- assert(tmp.magic == HCMAGIC);
+ assert(tmp.bytes >= (int)sizeof(tmp) && tmp.bytes < 65536);
+ assert(tmp.magic == HCMAGIC);
- if (anypkt == 0 && tmp.id != trans->id && trans->hc->version > 0) {
+ if (trans) {
+ fill = trans;
+ } else {
#if USE_PTHREADS
- for (fill = trans->hc->hct_hash[tmp.id & HCTHASH_MASK];
- fill;
- fill = fill->next)
- {
- if (fill->state == HCT_SENT && fill->id == tmp.id)
- break;
- }
- if (fill == NULL)
+ pthread_mutex_lock(&hc->hct_mutex[tmp.id & HCTHASH_MASK]);
+ for (fill = hc->hct_hash[tmp.id & HCTHASH_MASK];
+ fill;
+ fill = fill->next)
+ {
+ if (fill->state == HCT_SENT && fill->id == tmp.id)
+ break;
+ }
+ pthread_mutex_unlock(&hc->hct_mutex[tmp.id & HCTHASH_MASK]);
+ if (fill == NULL)
#endif
- {
- fprintf(stderr,
- "cpdup hlink protocol error with %s (%04x %04x)\n",
- trans->hc->host, trans->id, tmp.id);
- exit(1);
- }
- } else {
- fill = trans;
+ {
+ fprintf(stderr,
+ "cpdup hlink protocol error with %s (%04x %04x)\n",
+ hc->host, trans->id, tmp.id);
+ exit(1);
}
+ }
- bcopy(&tmp, fill->rbuf, n);
- aligned_bytes = HCC_ALIGN(tmp.bytes);
+ bcopy(&tmp, fill->rbuf, n);
+ aligned_bytes = HCC_ALIGN(tmp.bytes);
- while (n < aligned_bytes) {
- r = read(hc->fdin, fill->rbuf + n, aligned_bytes - n);
- if (r <= 0)
- goto fail;
- n += r;
- }
+ while (n < aligned_bytes) {
+ r = read(hc->fdin, fill->rbuf + n, aligned_bytes - n);
+ if (r <= 0)
+ goto fail;
+ n += r;
+ }
#ifdef DEBUG
- hcc_debug_dump(head);
+ hcc_debug_dump(head);
#endif
- fill->state = HCT_REPLIED;
- }
#if USE_PTHREADS
- pthread_mutex_lock(&MasterMutex);
- pthread_mutex_unlock(&hc->read_mutex);
+ pthread_mutex_lock(&hc->hct_mutex[fill->id & HCTHASH_MASK]);
#endif
- return((void *)trans->rbuf);
-fail:
+ fill->state = HCT_REPLIED;
#if USE_PTHREADS
- pthread_mutex_lock(&MasterMutex);
- pthread_mutex_unlock(&hc->read_mutex);
+ if (fill->waiting)
+ pthread_cond_signal(&fill->cond);
+ pthread_mutex_unlock(&hc->hct_mutex[fill->id & HCTHASH_MASK]);
#endif
+ return((void *)fill->rbuf);
+fail:
return(NULL);
}
@@ -240,6 +294,7 @@
i = ((intptr_t)tid >> 7) & HCTHASH_MASK;
+ pthread_mutex_lock(&hc->hct_mutex[i]);
for (trans = hc->hct_hash[i]; trans; trans = trans->next) {
if (trans->tid == tid)
break;
@@ -249,6 +304,7 @@
bzero(trans, sizeof(*trans));
trans->tid = tid;
trans->id = i;
+ pthread_cond_init(&trans->cond, NULL);
do {
for (scan = hc->hct_hash[i]; scan; scan = scan->next) {
if (scan->id == trans->id) {
@@ -261,9 +317,33 @@
trans->next = hc->hct_hash[i];
hc->hct_hash[i] = trans;
}
+ pthread_mutex_unlock(&hc->hct_mutex[i]);
return(trans);
}
+void
+hcc_free_trans(struct HostConf *hc)
+{
+ hctransaction_t trans;
+ hctransaction_t *transp;
+ pthread_t tid = pthread_self();
+ int i;
+
+ i = ((intptr_t)tid >> 7) & HCTHASH_MASK;
+
+ pthread_mutex_lock(&hc->hct_mutex[i]);
+ for (transp = &hc->hct_hash[i]; *transp; transp = &trans->next) {
+ trans = *transp;
+ if (trans->tid == tid) {
+ *transp = trans->next;
+ pthread_cond_destroy(&trans->cond);
+ free(trans);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&hc->hct_mutex[i]);
+}
+
#else
static
@@ -273,6 +353,12 @@
return(&hc->trans);
}
+void
+hcc_free_trans(struct HostConf *hc __unused)
+{
+ /* nop */
+}
+
#endif
/*
@@ -321,18 +407,20 @@
struct HCHead *
hcc_finish_command(hctransaction_t trans)
{
+ struct HostConf *hc;
struct HCHead *whead;
struct HCHead *rhead;
int aligned_bytes;
int16_t wcmd;
+ hc = trans->hc;
whead = (void *)trans->wbuf;
whead->bytes = trans->windex;
aligned_bytes = HCC_ALIGN(trans->windex);
trans->state = HCT_SENT;
- if (write(trans->hc->fdout, whead, aligned_bytes) != aligned_bytes) {
+ if (write(hc->fdout, whead, aligned_bytes) != aligned_bytes) {
#ifdef __error
*__error = EIO;
#else
@@ -340,7 +428,7 @@
#endif
if (whead->cmd < 0x0010)
return(NULL);
- fprintf(stderr, "cpdup lost connection to %s\n", trans->hc->host);
+ fprintf(stderr, "cpdup lost connection to %s\n", hc->host);
exit(1);
}
@@ -350,7 +438,23 @@
* whead is invalid when we call hcc_read_command() because
* we may switch to another thread.
*/
- if ((rhead = hcc_read_command(trans, 0)) == NULL) {
+#if USE_PTHREADS
+ pthread_mutex_unlock(&MasterMutex);
+ while (trans->state != HCT_REPLIED && hc->reader_thread) {
+ pthread_mutex_t *mtxp = &hc->hct_mutex[trans->id & HCTHASH_MASK];
+ pthread_mutex_lock(mtxp);
+ trans->waiting = 1;
+ if (trans->state != HCT_REPLIED && hc->reader_thread)
+ pthread_cond_wait(&trans->cond, mtxp);
+ trans->waiting = 0;
+ pthread_mutex_unlock(mtxp);
+ }
+ pthread_mutex_lock(&MasterMutex);
+ rhead = (void *)trans->rbuf;
+#else
+ rhead = hcc_read_command(hc, trans);
+#endif
+ if (trans->state != HCT_REPLIED || rhead->id != trans->id) {
#ifdef __error
*__error = EIO;
#else
@@ -358,9 +462,10 @@
#endif
if (wcmd < 0x0010)
return(NULL);
- fprintf(stderr, "cpdup lost connection to %s\n", trans->hc->host);
+ fprintf(stderr, "cpdup lost connection to %s\n", hc->host);
exit(1);
}
+ trans->state = HCT_DONE;
if (rhead->error) {
#ifdef __error
Index: PORTING
===================================================================
RCS file: /home/cvs/src/bin/cpdup/PORTING,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/PORTING -L bin/cpdup/PORTING -u -r1.2 -r1.3
--- bin/cpdup/PORTING
+++ bin/cpdup/PORTING
@@ -1,5 +1,5 @@
$MIdnightBSD$
-$DragonFly: src/bin/cpdup/PORTING,v 1.1 2006/09/16 21:57:08 dillon Exp $
+$DragonFly: src/bin/cpdup/PORTING,v 1.2 2008/04/11 17:33:11 dillon Exp $
PORTING CPDUP
@@ -24,8 +24,10 @@
cd cpdup
rm -f md5.c
rm -f *.o
- cc -c -D__unused= -D_GNU_SOURCE -D__USE_FILE_OFFSET64 -DNOMD5 *.c
- cc -D__unused= -D_GNU_SOURCE -D__USE_FILE_OFFSET64 -DNOMD5 *.o -o ~/bin/cpdup
+ cc -D__unused= -D_GNU_SOURCE -D__USE_FILE_OFFSET64 -DNOMD5 \
+ -DUSE_PTHREADS -pthread *.c -c
+ cc -D__unused= -D_GNU_SOURCE -D__USE_FILE_OFFSET64 -DNOMD5 \
+ -DUSE_PTHREADS -pthread *.o -o ~/bin/cpdup
BACKUP SCRIPT MODIFICATIONS - you will almost certainly have to adjust
the do_cleanup script to extract the proper field(s) from the df output.
Index: cpdup.c
===================================================================
RCS file: /home/cvs/src/bin/cpdup/cpdup.c,v
retrieving revision 1.2
retrieving revision 1.3
diff -L bin/cpdup/cpdup.c -L bin/cpdup/cpdup.c -u -r1.2 -r1.3
--- bin/cpdup/cpdup.c
+++ bin/cpdup/cpdup.c
@@ -65,6 +65,12 @@
#define HLSIZE 8192
#define HLMASK (HLSIZE - 1)
+#define MAXDEPTH 32 /* max copy depth for thread */
+#define GETBUFSIZE 8192
+#define GETPATHSIZE 2048
+#define GETLINKSIZE 1024
+#define GETIOSIZE 65536
+
#ifndef _ST_FLAGS_PRESENT_
#define st_flags st_mode
#endif
@@ -84,6 +90,7 @@
struct hlink {
ino_t ino;
ino_t dino;
+ int refs;
struct hlink *next;
struct hlink *prev;
nlink_t nlinked;
@@ -115,11 +122,12 @@
static int validate_check(const char *spath, const char *dpath);
static int shash(const char *s);
static void hltdelete(struct hlink *);
+static void hltsetdino(struct hlink *, ino_t);
int YesNo(const char *path);
static int xrename(const char *src, const char *dst, u_long flags);
static int xlink(const char *src, const char *dst, u_long flags);
int WildCmp(const char *s1, const char *s2);
-static int DoCopy(copy_info_t info);
+static int DoCopy(copy_info_t info, int depth);
int AskConfirmation = 1;
int SafetyOpt = 1;
@@ -131,14 +139,14 @@
int UseMD5Opt;
int UseFSMIDOpt;
int SummaryOpt;
+int CompressOpt;
int SlaveOpt;
int EnableDirectoryRetries;
int DstBaseLen;
int ValidateOpt;
int CurParallel;
int MaxParallel = -1;
-char IOBuf1[65536];
-char IOBuf2[65536];
+int HardLinkCount;
const char *UseCpFile;
const char *UseHLPath;
const char *MD5CacheFile;
@@ -173,8 +181,10 @@
signal(SIGPIPE, SIG_IGN);
#if USE_PTHREADS
- pthread_mutex_init(&SrcHost.read_mutex, NULL);
- pthread_mutex_init(&DstHost.read_mutex, NULL);
+ for (i = 0; i < HCTHASH_SIZE; ++i) {
+ pthread_mutex_init(&SrcHost.hct_mutex[i], NULL);
+ pthread_mutex_init(&DstHost.hct_mutex[i], NULL);
+ }
pthread_mutex_init(&MasterMutex, NULL);
pthread_mutex_lock(&MasterMutex);
#endif
@@ -201,6 +211,9 @@
v = strtol(ptr, NULL, 0);
switch(ptr[-1]) {
+ case 'C':
+ CompressOpt = 1;
+ break;
case 'v':
VerboseOpt = 1;
while (*ptr == 'v') {
@@ -303,7 +316,7 @@
exit(1);
}
if (hc_connect(&SrcHost) < 0)
- fprintf(stderr, "Unable to connect to %s\n", SrcHost.host);
+ exit(1);
}
if (dst && (ptr = strchr(dst, ':')) != NULL) {
asprintf(&DstHost.host, "%*.*s", ptr - dst, ptr - dst, dst);
@@ -313,7 +326,7 @@
exit(1);
}
if (hc_connect(&DstHost) < 0)
- fprintf(stderr, "Unable to connect to %s\n", DstHost.host);
+ exit(1);
}
/*
@@ -324,6 +337,7 @@
fatal(NULL);
/* not reached */
}
+ bzero(&info, sizeof(info));
#if USE_PTHREADS
info.r = 0;
info.children = 0;
@@ -335,13 +349,13 @@
info.dpath = dst;
info.sdevNo = (dev_t)-1;
info.ddevNo = (dev_t)-1;
- i = DoCopy(&info);
+ i = DoCopy(&info, -1);
} else {
info.spath = src;
info.dpath = NULL;
info.sdevNo = (dev_t)-1;
info.ddevNo = (dev_t)-1;
- i = DoCopy(&info);
+ i = DoCopy(&info, -1);
}
#if USE_PTHREADS
pthread_cond_destroy(&info.cond);
@@ -352,7 +366,7 @@
fsmid_flush();
if (SummaryOpt && i == 0) {
- long duration;
+ double duration;
struct timeval end;
gettimeofday(&end, NULL);
@@ -364,10 +378,10 @@
CountWriteBytes += sizeof(struct stat) * CountRemovedItems;
#endif
- duration = end.tv_sec - start.tv_sec;
- duration *= 1000000;
- duration += end.tv_usec - start.tv_usec;
- if (duration == 0) duration = 1;
+ duration = (end.tv_sec - start.tv_sec);
+ duration += (double)(end.tv_usec - start.tv_usec) / 1000000.0;
+ if (duration == 0.0)
+ duration = 1.0;
logstd("cpdup completed successfully\n");
logstd("%lld bytes source, %lld src bytes read, %lld tgt bytes read\n"
"%lld bytes written (%.1fX speedup)\n",
@@ -383,9 +397,9 @@
(long long)CountLinkedItems,
(long long)CountRemovedItems);
logstd("%.1f seconds %5d Kbytes/sec synced %5d Kbytes/sec scanned\n",
- (float)duration / (float)1000000,
- (long)((long)1000000 * (CountSourceReadBytes + CountTargetReadBytes + CountWriteBytes) / duration / 1024.0),
- (long)((long)1000000 * CountSourceBytes / duration / 1024.0));
+ duration,
+ (int)((CountSourceReadBytes + CountTargetReadBytes + CountWriteBytes) / duration / 1024.0),
+ (int)(CountSourceBytes / duration / 1024.0));
}
exit((i == 0) ? 0 : 1);
}
@@ -393,14 +407,36 @@
static struct hlink *
hltlookup(struct stat *stp)
{
+#if USE_PTHREADS
+ struct timespec ts = { 0, 100000 };
+#endif
struct hlink *hl;
int n;
n = stp->st_ino & HLMASK;
- for (hl = hltable[n]; hl; hl = hl->next)
- if (hl->ino == stp->st_ino)
- return hl;
+#if USE_PTHREADS
+again:
+#endif
+ for (hl = hltable[n]; hl; hl = hl->next) {
+ if (hl->ino == stp->st_ino) {
+#if USE_PTHREADS
+ /*
+ * If the hl entry is still in the process of being created
+ * by another thread we have to wait until it has either been
+ * deleted or completed.
+ */
+ if (hl->refs) {
+ pthread_mutex_unlock(&MasterMutex);
+ nanosleep(&ts, NULL);
+ pthread_mutex_lock(&MasterMutex);
+ goto again;
+ }
+#endif
+ ++hl->refs;
+ return hl;
+ }
+ }
return NULL;
}
@@ -417,10 +453,12 @@
fprintf(stderr, "out of memory\n");
exit(EXIT_FAILURE);
}
+ ++HardLinkCount;
/* initialize and link the new element into the table */
new->ino = stp->st_ino;
- new->dino = 0;
+ new->dino = (ino_t)-1;
+ new->refs = 1;
bcopy(path, new->name, plen + 1);
new->nlinked = 1;
new->prev = NULL;
@@ -434,8 +472,16 @@
}
static void
+hltsetdino(struct hlink *hl, ino_t inum)
+{
+ hl->dino = inum;
+}
+
+static void
hltdelete(struct hlink *hl)
{
+ assert(hl->refs == 1);
+ --hl->refs;
if (hl->prev) {
if (hl->next)
hl->next->prev = hl->prev;
@@ -446,10 +492,17 @@
hltable[hl->ino & HLMASK] = hl->next;
}
-
+ --HardLinkCount;
free(hl);
}
+static void
+hltrels(struct hlink *hl)
+{
+ assert(hl->refs == 1);
+ --hl->refs;
+}
+
/*
* If UseHLPath is defined check to see if the file in question is
* the same as the source file, and if it is return a pointer to the
@@ -508,17 +561,21 @@
if (fd1 >= 0 && fd2 >= 0) {
int n;
int x;
+ char *iobuf1 = malloc(GETIOSIZE);
+ char *iobuf2 = malloc(GETIOSIZE);
- while ((n = hc_read(&SrcHost, fd1, IOBuf1, sizeof(IOBuf1))) > 0) {
+ while ((n = hc_read(&SrcHost, fd1, iobuf1, GETIOSIZE)) > 0) {
CountSourceReadBytes += n;
- x = hc_read(&DstHost, fd2, IOBuf2, sizeof(IOBuf2));
+ x = hc_read(&DstHost, fd2, iobuf2, GETIOSIZE);
if (x > 0)
CountTargetReadBytes += x;
if (x != n)
break;
- if (bcmp(IOBuf1, IOBuf2, n) != 0)
+ if (bcmp(iobuf1, iobuf2, n) != 0)
break;
}
+ free(iobuf1);
+ free(iobuf2);
if (n == 0)
error = 0;
}
@@ -536,27 +593,32 @@
copy_info_t cinfo = arg;
char *spath = cinfo->spath;
char *dpath = cinfo->dpath;
+ int r;
+ r = pthread_detach(pthread_self());
+ assert(r == 0);
pthread_cond_init(&cinfo->cond, NULL);
pthread_mutex_lock(&MasterMutex);
- cinfo->r += DoCopy(cinfo);
+ cinfo->r += DoCopy(cinfo, 0);
/* cinfo arguments invalid on return */
--cinfo->parent->children;
--CurParallel;
pthread_cond_signal(&cinfo->parent->cond);
- pthread_mutex_unlock(&MasterMutex);
free(spath);
if (dpath)
free(dpath);
pthread_cond_destroy(&cinfo->cond);
free(cinfo);
+ hcc_free_trans(&SrcHost);
+ hcc_free_trans(&DstHost);
+ pthread_mutex_unlock(&MasterMutex);
return(NULL);
}
#endif
int
-DoCopy(copy_info_t info)
+DoCopy(copy_info_t info, int depth)
{
const char *spath = info->spath;
const char *dpath = info->dpath;
@@ -602,8 +664,10 @@
*/
if (VerboseOpt >= 3)
logstd("%-32s nochange\n", (dpath) ? dpath : spath);
- if (hln->nlinked == st1.st_nlink)
+ if (hln->nlinked == st1.st_nlink) {
hltdelete(hln);
+ hln = NULL;
+ }
CountSourceItems++;
r = 0;
goto done;
@@ -615,7 +679,9 @@
logerr("%-32s hardlink: unable to unlink: %s\n",
((dpath) ? dpath : spath), strerror(errno));
hltdelete(hln);
- return (r + 1);
+ hln = NULL;
+ ++r;
+ goto done;
}
}
}
@@ -711,7 +777,7 @@
validate_check(spath, dpath) == 0)
) {
if (hln)
- hln->dino = st2.st_ino;
+ hltsetdino(hln, st2.st_ino);
if (VerboseOpt >= 3) {
#ifndef NOMD5
if (UseMD5Opt)
@@ -812,7 +878,7 @@
if (UseCpFile) {
FILE *fi;
- char buf[8192];
+ char *buf = malloc(GETBUFSIZE);
char *fpath;
if (UseCpFile[0] == '/') {
@@ -822,7 +888,7 @@
}
AddList(list, strrchr(fpath, '/') + 1, 1);
if ((fi = fopen(fpath, "r")) != NULL) {
- while (fgets(buf, sizeof(buf), fi) != NULL) {
+ while (fgets(buf, GETBUFSIZE, fi) != NULL) {
int l = strlen(buf);
CountSourceReadBytes += l;
if (l && buf[l-1] == '\n')
@@ -833,6 +899,7 @@
fclose(fi);
}
free(fpath);
+ free(buf);
}
/*
@@ -870,9 +937,9 @@
ndpath = mprintf("%s/%s", dpath, den->d_name);
#if USE_PTHREADS
- if (CurParallel < MaxParallel) {
+ if (CurParallel < MaxParallel || depth > MAXDEPTH) {
copy_info_t cinfo = malloc(sizeof(*cinfo));
- pthread_t dummy_thr = NULL;
+ pthread_t dummy_thr;
bzero(cinfo, sizeof(*cinfo));
cinfo->spath = nspath;
@@ -890,10 +957,15 @@
info->dpath = ndpath;
info->sdevNo = sdevNo;
info->ddevNo = ddevNo;
- r += DoCopy(info);
+ if (depth < 0)
+ r += DoCopy(info, depth);
+ else
+ r += DoCopy(info, depth + 1);
free(nspath);
if (ndpath)
free(ndpath);
+ info->spath = NULL;
+ info->dpath = NULL;
}
}
@@ -992,7 +1064,7 @@
int fd1;
int fd2;
- path = mprintf("%s.tmp", dpath);
+ path = mprintf("%s.tmp%d", dpath, (int)getpid());
/*
* Handle check failure message.
@@ -1047,15 +1119,16 @@
}
if (fd2 >= 0) {
const char *op;
+ char *iobuf1 = malloc(GETIOSIZE);
int n;
/*
* Matt: What about holes?
*/
op = "read";
- while ((n = hc_read(&SrcHost, fd1, IOBuf1, sizeof(IOBuf1))) > 0) {
+ while ((n = hc_read(&SrcHost, fd1, iobuf1, GETIOSIZE)) > 0) {
op = "write";
- if (hc_write(&DstHost, fd2, IOBuf1, n) != n)
+ if (hc_write(&DstHost, fd2, iobuf1, n) != n)
break;
op = "read";
}
@@ -1095,6 +1168,7 @@
hc_remove(&DstHost, path);
++r;
}
+ free(iobuf1);
} else {
logerr("%-32s create (uid %d, euid %d) failed: %s\n",
(dpath ? dpath : spath), getuid(), geteuid(),
@@ -1114,21 +1188,23 @@
free(path);
if (hln) {
- if (!r && hc_stat(&DstHost, dpath, &st2) == 0)
- hln->dino = st2.st_ino;
- else
+ if (!r && hc_stat(&DstHost, dpath, &st2) == 0) {
+ hltsetdino(hln, st2.st_ino);
+ } else {
hltdelete(hln);
+ hln = NULL;
+ }
}
} else if (S_ISLNK(st1.st_mode)) {
- char link1[1024];
- char link2[1024];
- char path[2048];
+ char *link1 = malloc(GETLINKSIZE);
+ char *link2 = malloc(GETLINKSIZE);
+ char *path = malloc(GETPATHSIZE);
int n1;
int n2;
- snprintf(path, sizeof(path), "%s.tmp", dpath);
- n1 = hc_readlink(&SrcHost, spath, link1, sizeof(link1) - 1);
- n2 = hc_readlink(&DstHost, dpath, link2, sizeof(link2) - 1);
+ snprintf(path, GETPATHSIZE, "%s.tmp%d", dpath, (int)getpid());
+ n1 = hc_readlink(&SrcHost, spath, link1, GETLINKSIZE - 1);
+ n2 = hc_readlink(&DstHost, dpath, link2, GETLINKSIZE - 1);
if (n1 >= 0) {
if (ForceOpt || n1 != n2 || bcmp(link1, link2, n1) != 0) {
hc_umask(&DstHost, ~st1.st_mode);
@@ -1170,8 +1246,11 @@
r = 1;
logerr("%-32s softlink-failed\n", (dpath ? dpath : spath));
}
+ free(link1);
+ free(link2);
+ free(path);
} else if ((S_ISCHR(st1.st_mode) || S_ISBLK(st1.st_mode)) && DeviceOpt) {
- char path[2048];
+ char *path = malloc(GETPATHSIZE);
if (ForceOpt ||
st2Valid == 0 ||
@@ -1180,10 +1259,10 @@
st1.st_uid != st2.st_uid ||
st1.st_gid != st2.st_gid
) {
- snprintf(path, sizeof(path), "%s.tmp", dpath);
+ snprintf(path, GETPATHSIZE, "%s.tmp%d", dpath, (int)getpid());
hc_remove(&DstHost, path);
- if (mknod(path, st1.st_mode, st1.st_rdev) == 0) {
+ if (hc_mknod(&DstHost, path, st1.st_mode, st1.st_rdev) == 0) {
hc_chmod(&DstHost, path, st1.st_mode);
hc_chown(&DstHost, path, st1.st_uid, st1.st_gid);
hc_remove(&DstHost, dpath);
@@ -1206,9 +1285,18 @@
if (VerboseOpt >= 3)
logstd("%-32s nochange\n", (dpath ? dpath : spath));
}
+ free(path);
CountSourceItems++;
}
done:
+ if (hln) {
+ if (hln->dino == (ino_t)-1) {
+ hltdelete(hln);
+ /*hln = NULL; unneeded */
+ } else {
+ hltrels(hln);
+ }
+ }
ResetList(list);
free(list);
return (r);
More information about the Midnightbsd-cvs
mailing list