1 /*        $NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $     */
2 
3 /*-
4  * Copyright (c)2010,2011 YAMAMOTO Takashi,
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  */
28 
29 /*
30  * backend db operations
31  */
32 
33 #include <sys/cdefs.h>
34 #ifndef lint
35 __RCSID("$NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $");
36 #endif /* not lint */
37 
38 #include <assert.h>
39 #include <err.h>
40 #include <errno.h>
41 #include <inttypes.h>
42 #include <puffs.h>
43 #include <stdbool.h>
44 #include <stdarg.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <util.h>
48 
49 #include <libpq-fe.h>
50 
51 #include "pgfs_db.h"
52 #include "pgfs_waitq.h"
53 #include "pgfs_debug.h"
54 
55 bool pgfs_dosync = false;
56 
57 struct Xconn {
58           TAILQ_ENTRY(Xconn) list;
59           PGconn *conn;
60           struct puffs_cc *blocker;
61           struct puffs_cc *owner;
62           bool in_trans;
63           int id;
64           const char *label;
65 };
66 
67 static void
dumperror(struct Xconn * xc,const PGresult * res)68 dumperror(struct Xconn *xc, const PGresult *res)
69 {
70           static const struct {
71                     const char *name;
72                     int code;
73           } fields[] = {
74 #define F(x)        { .name = #x, .code = x, }
75                     F(PG_DIAG_SEVERITY),
76                     F(PG_DIAG_SQLSTATE),
77                     F(PG_DIAG_MESSAGE_PRIMARY),
78                     F(PG_DIAG_MESSAGE_DETAIL),
79                     F(PG_DIAG_MESSAGE_HINT),
80                     F(PG_DIAG_STATEMENT_POSITION),
81                     F(PG_DIAG_INTERNAL_POSITION),
82                     F(PG_DIAG_INTERNAL_QUERY),
83                     F(PG_DIAG_CONTEXT),
84                     F(PG_DIAG_SOURCE_FILE),
85                     F(PG_DIAG_SOURCE_LINE),
86                     F(PG_DIAG_SOURCE_FUNCTION),
87 #undef F
88           };
89           unsigned int i;
90 
91           if (!pgfs_dodprintf) {
92                     return;
93           }
94           assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
95               PQresultStatus(res) == PGRES_FATAL_ERROR);
96           for (i = 0; i < __arraycount(fields); i++) {
97                     const char *val = PQresultErrorField(res, fields[i].code);
98 
99                     if (val == NULL) {
100                               continue;
101                     }
102                     fprintf(stderr, "%s: %s\n", fields[i].name, val);
103           }
104 }
105 
106 TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
107 struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);
108 
109 static struct Xconn *
getxc(struct puffs_cc * cc)110 getxc(struct puffs_cc *cc)
111 {
112           struct Xconn *xc;
113 
114           assert(cc != NULL);
115 retry:
116           TAILQ_FOREACH(xc, &xclist, list) {
117                     if (xc->blocker == NULL) {
118                               assert(xc->owner == NULL);
119                               xc->owner = cc;
120                               DPRINTF("xc %p acquire %p\n", xc, cc);
121                               return xc;
122                     } else {
123                               assert(xc->owner == xc->blocker);
124                     }
125           }
126           DPRINTF("no free conn %p\n", cc);
127           waiton(&xcwaitq, cc);
128           goto retry;
129 }
130 
131 static void
relxc(struct Xconn * xc)132 relxc(struct Xconn *xc)
133 {
134 
135           assert(xc->in_trans);
136           assert(xc->owner != NULL);
137           xc->in_trans = false;
138           xc->owner = NULL;
139           wakeup_one(&xcwaitq);
140 }
141 
142 static void
pqwait(struct Xconn * xc)143 pqwait(struct Xconn *xc)
144 {
145           PGconn *conn = xc->conn;
146           struct puffs_cc *cc = xc->owner;
147 
148           if (PQflush(conn)) {
149                     errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
150           }
151           if (!PQisBusy(conn)) {
152                     return;
153           }
154           assert(xc->blocker == NULL);
155           xc->blocker = cc;
156           DPRINTF("yielding %p\n", cc);
157           /* XXX is it safe to yield before entering mainloop? */
158           puffs_cc_yield(cc);
159           DPRINTF("yield returned %p\n", cc);
160           assert(xc->owner == cc);
161           assert(xc->blocker == cc);
162           xc->blocker = NULL;
163 }
164 
165 static int
sqltoerrno(const char * sqlstate)166 sqltoerrno(const char *sqlstate)
167 {
168           /*
169            * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
170            * "tuple concurrently updated" errors for lowrite/lo_truncate.
171            *
172            * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
173            */
174           static const struct {
175                     char sqlstate[5];
176                     int error;
177           } map[] = {
178                     { "00000", 0, },    /* ERRCODE_SUCCESSFUL_COMPLETION */
179                     { "02000", ENOENT, },         /* ERRCODE_NO_DATA */
180                     { "23505", EEXIST, },         /* ERRCODE_UNIQUE_VIOLATION */
181                     { "23514", EINVAL, },         /* ERRCODE_CHECK_VIOLATION */
182                     { "40001", EAGAIN, },         /* ERRCODE_T_R_SERIALIZATION_FAILURE */
183                     { "40P01", EAGAIN, },         /* ERRCODE_T_R_DEADLOCK_DETECTED */
184                     { "42704", ENOENT, },         /* ERRCODE_UNDEFINED_OBJECT */
185                     { "53100", ENOSPC, },         /* ERRCODE_DISK_FULL */
186                     { "53200", ENOMEM, },         /* ERRCODE_OUT_OF_MEMORY */
187                     { "XX000", EAGAIN, },         /* ERRCODE_INTERNAL_ERROR */
188           };
189           unsigned int i;
190 
191           for (i = 0; i < __arraycount(map); i++) {
192                     if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
193                               const int error = map[i].error;
194 
195                               if (error != 0) {
196                                         DPRINTF("sqlstate %5s mapped to error %d\n",
197                                             sqlstate, error);
198                               }
199                               if (error == EINVAL) {
200                                         /*
201                                          * sounds like a bug.
202                                          */
203                                         abort();
204                               }
205                               return error;
206                     }
207           }
208           DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
209           return EIO;
210 }
211 
212 struct cmd {
213           char name[32];                /* name of prepared statement */
214           char *cmd;                    /* query string */
215           unsigned int nparams;
216           Oid *paramtypes;
217           uint32_t prepared_mask;       /* for which connections this is prepared? */
218           unsigned int flags; /* CMD_ flags */
219 };
220 
221 #define   CMD_NOPREPARE       1         /* don't prepare this command */
222 
223 struct cmd *
createcmd(const char * cmd,unsigned int flags,...)224 createcmd(const char *cmd, unsigned int flags, ...)
225 {
226           struct cmd *c;
227           va_list ap;
228           const char *cp;
229           unsigned int i;
230           static unsigned int cmdid;
231 
232           c = emalloc(sizeof(*c));
233           c->cmd = estrdup(cmd);
234           c->nparams = 0;
235           va_start(ap, flags);
236           for (cp = cmd; *cp != 0; cp++) {
237                     if (*cp == '$') { /* XXX */
238                               c->nparams++;
239                     }
240           }
241           c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
242           for (i = 0; i < c->nparams; i++) {
243                     Oid type = va_arg(ap, Oid);
244                     assert(type == BYTEA ||
245                         type == INT4OID || type == INT8OID || type == OIDOID ||
246                         type == TEXTOID || type == TIMESTAMPTZOID);
247                     c->paramtypes[i] = type;
248           }
249           va_end(ap);
250           snprintf(c->name, sizeof(c->name), "%u", cmdid++);
251           if ((flags & CMD_NOPREPARE) != 0) {
252                     c->prepared_mask = ~0;
253           } else {
254                     c->prepared_mask = 0;
255           }
256           c->flags = flags;
257           return c;
258 }
259 
260 static void
freecmd(struct cmd * c)261 freecmd(struct cmd *c)
262 {
263 
264           free(c->paramtypes);
265           free(c->cmd);
266           free(c);
267 }
268 
269 static int
fetch_noresult(struct Xconn * xc)270 fetch_noresult(struct Xconn *xc)
271 {
272           PGresult *res;
273           ExecStatusType status;
274           PGconn *conn = xc->conn;
275           int error;
276 
277           pqwait(xc);
278           res = PQgetResult(conn);
279           if (res == NULL) {
280                     return ENOENT;
281           }
282           status = PQresultStatus(res);
283           if (status == PGRES_COMMAND_OK) {
284                     assert(PQnfields(res) == 0);
285                     assert(PQntuples(res) == 0);
286                     if (!strcmp(PQcmdTuples(res), "0")) {
287                               error = ENOENT;
288                     } else {
289                               error = 0;
290                     }
291           } else if (status == PGRES_FATAL_ERROR) {
292                     error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
293                     assert(error != 0);
294                     dumperror(xc, res);
295           } else {
296                     errx(1, "%s not command_ok: %d: %s", __func__,
297                         (int)status,
298                         PQerrorMessage(conn));
299           }
300           PQclear(res);
301           res = PQgetResult(conn);
302           assert(res == NULL);
303           if (error != 0) {
304                     DPRINTF("error %d\n", error);
305           }
306           return error;
307 }
308 
309 static int
preparecmd(struct Xconn * xc,struct cmd * c)310 preparecmd(struct Xconn *xc, struct cmd *c)
311 {
312           PGconn *conn = xc->conn;
313           const uint32_t mask = 1 << xc->id;
314           int error;
315           int ret;
316 
317           if ((c->prepared_mask & mask) != 0) {
318                     return 0;
319           }
320           assert((c->flags & CMD_NOPREPARE) == 0);
321           DPRINTF("PREPARE: '%s'\n", c->cmd);
322           ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
323           if (!ret) {
324                     errx(EXIT_FAILURE, "PQsendPrepare: %s",
325                         PQerrorMessage(conn));
326           }
327           error = fetch_noresult(xc);
328           if (error != 0) {
329                     return error;
330           }
331           c->prepared_mask |= mask;
332           return 0;
333 }
334 
335 /*
336  * vsendcmd:
337  *
338  * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
339  * 0 for text and 1 for binary.
340  */
341 
342 static int
vsendcmd(struct Xconn * xc,int resultmode,struct cmd * c,va_list ap)343 vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
344 {
345           PGconn *conn = xc->conn;
346           char **paramvalues;
347           int *paramlengths;
348           int *paramformats;
349           unsigned int i;
350           int error;
351           int ret;
352 
353           assert(xc->owner != NULL);
354           assert(xc->blocker == NULL);
355           error = preparecmd(xc, c);
356           if (error != 0) {
357                     return error;
358           }
359           paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
360           paramlengths = NULL;
361           paramformats = NULL;
362           DPRINTF("CMD: '%s'\n", c->cmd);
363           for (i = 0; i < c->nparams; i++) {
364                     Oid type = c->paramtypes[i];
365                     char tmpstore[1024];
366                     const char *buf = NULL;
367                     intmax_t v = 0; /* XXXgcc */
368                     int sz;
369                     bool binary = false;
370 
371                     switch (type) {
372                     case BYTEA:
373                               buf = va_arg(ap, const void *);
374                               sz = (int)va_arg(ap, size_t);
375                               binary = true;
376                               break;
377                     case INT8OID:
378                     case OIDOID:
379                     case INT4OID:
380                               switch (type) {
381                               case INT8OID:
382                                         v = (intmax_t)va_arg(ap, int64_t);
383                                         break;
384                               case OIDOID:
385                                         v = (intmax_t)va_arg(ap, Oid);
386                                         break;
387                               case INT4OID:
388                                         v = (intmax_t)va_arg(ap, int32_t);
389                                         break;
390                               default:
391                                         errx(EXIT_FAILURE, "unknown integer oid %u",
392                                             type);
393                               }
394                               buf = tmpstore;
395                               sz = snprintf(tmpstore, sizeof(tmpstore),
396                                   "%jd", v);
397                               assert(sz != -1);
398                               assert((size_t)sz < sizeof(tmpstore));
399                               sz += 1;
400                               break;
401                     case TEXTOID:
402                     case TIMESTAMPTZOID:
403                               buf = va_arg(ap, char *);
404                               sz = strlen(buf) + 1;
405                               break;
406                     default:
407                               errx(EXIT_FAILURE, "%s: unknown param type %u",
408                                   __func__, type);
409                     }
410                     if (binary) {
411                               if (paramlengths == NULL) {
412                                         paramlengths =
413                                             emalloc(c->nparams * sizeof(*paramformats));
414                               }
415                               if (paramformats == NULL) {
416                                         paramformats = ecalloc(1,
417                                             c->nparams * sizeof(*paramformats));
418                               }
419                               paramformats[i] = 1;
420                               paramlengths[i] = sz;
421                     }
422                     paramvalues[i] = emalloc(sz);
423                     memcpy(paramvalues[i], buf, sz);
424                     if (binary) {
425                               DPRINTF("\t[%u]=<BINARY>\n", i);
426                     } else {
427                               DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
428                     }
429           }
430           if ((c->flags & CMD_NOPREPARE) != 0) {
431                     ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
432                         (const char * const *)paramvalues, paramlengths,
433                         paramformats, resultmode);
434           } else {
435                     ret = PQsendQueryPrepared(conn, c->name, c->nparams,
436                         (const char * const *)paramvalues, paramlengths,
437                         paramformats, resultmode);
438           }
439           for (i = 0; i < c->nparams; i++) {
440                     free(paramvalues[i]);
441           }
442           free(paramvalues);
443           free(paramlengths);
444           free(paramformats);
445           if (!ret) {
446                     errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
447                         PQerrorMessage(conn));
448           }
449           return 0;
450 }
451 
452 int
sendcmd(struct Xconn * xc,struct cmd * c,...)453 sendcmd(struct Xconn *xc, struct cmd *c, ...)
454 {
455           va_list ap;
456           int error;
457 
458           va_start(ap, c);
459           error = vsendcmd(xc, 0, c, ap);
460           va_end(ap);
461           return error;
462 }
463 
464 int
sendcmdx(struct Xconn * xc,int resultmode,struct cmd * c,...)465 sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
466 {
467           va_list ap;
468           int error;
469 
470           va_start(ap, c);
471           error = vsendcmd(xc, resultmode, c, ap);
472           va_end(ap);
473           return error;
474 }
475 
476 /*
477  * simplecmd: a convenient routine to execute a command which returns
478  * no rows synchronously.
479  */
480 
481 int
simplecmd(struct Xconn * xc,struct cmd * c,...)482 simplecmd(struct Xconn *xc, struct cmd *c, ...)
483 {
484           va_list ap;
485           int error;
486 
487           va_start(ap, c);
488           error = vsendcmd(xc, 0, c, ap);
489           va_end(ap);
490           if (error != 0) {
491                     return error;
492           }
493           return fetch_noresult(xc);
494 }
495 
496 void
fetchinit(struct fetchstatus * s,struct Xconn * xc)497 fetchinit(struct fetchstatus *s, struct Xconn *xc)
498 {
499           s->xc = xc;
500           s->res = NULL;
501           s->cur = 0;
502           s->nrows = 0;
503           s->done = false;
504 }
505 
506 static intmax_t
getint(const char * str)507 getint(const char *str)
508 {
509           intmax_t i;
510           char *ep;
511 
512           errno = 0;
513           i = strtoimax(str, &ep, 10);
514           assert(errno == 0);
515           assert(str[0] != 0);
516           assert(*ep == 0);
517           return i;
518 }
519 
520 static int
vfetchnext(struct fetchstatus * s,unsigned int n,const Oid * types,va_list ap)521 vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
522 {
523           PGconn *conn = s->xc->conn;
524           unsigned int i;
525 
526           assert(conn != NULL);
527           if (s->res == NULL) {
528                     ExecStatusType status;
529                     int error;
530 
531                     pqwait(s->xc);
532                     s->res = PQgetResult(conn);
533                     if (s->res == NULL) {
534                               s->done = true;
535                               return ENOENT;
536                     }
537                     status = PQresultStatus(s->res);
538                     if (status == PGRES_FATAL_ERROR) {
539                               error = sqltoerrno(
540                                   PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
541                               assert(error != 0);
542                               dumperror(s->xc, s->res);
543                               return error;
544                     }
545                     if (status != PGRES_TUPLES_OK) {
546                               errx(1, "not tuples_ok: %s",
547                                   PQerrorMessage(conn));
548                     }
549                     assert((unsigned int)PQnfields(s->res) == n);
550                     s->nrows = PQntuples(s->res);
551                     if (s->nrows == 0) {
552                               DPRINTF("no rows\n");
553                               return ENOENT;
554                     }
555                     assert(s->nrows >= 1);
556                     s->cur = 0;
557           }
558           for (i = 0; i < n; i++) {
559                     size_t size;
560 
561                     assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
562                     DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
563                         i, PQftype(s->res, i), types[i],
564                         PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
565                         PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
566                         "<BINARY>");
567                     assert(PQftype(s->res, i) == types[i]);
568                     assert(!PQgetisnull(s->res, s->cur, i));
569                     switch(types[i]) {
570                     case INT8OID:
571                               *va_arg(ap, int64_t *) =
572                                   getint(PQgetvalue(s->res, s->cur, i));
573                               break;
574                     case OIDOID:
575                               *va_arg(ap, Oid *) =
576                                   getint(PQgetvalue(s->res, s->cur, i));
577                               break;
578                     case INT4OID:
579                               *va_arg(ap, int32_t *) =
580                                   getint(PQgetvalue(s->res, s->cur, i));
581                               break;
582                     case TEXTOID:
583                               *va_arg(ap, char **) =
584                                   estrdup(PQgetvalue(s->res, s->cur, i));
585                               break;
586                     case BYTEA:
587                               size = PQgetlength(s->res, s->cur, i);
588                               memcpy(va_arg(ap, void *),
589                                   PQgetvalue(s->res, s->cur, i), size);
590                               *va_arg(ap, size_t *) = size;
591                               break;
592                     default:
593                               errx(EXIT_FAILURE, "%s unknown type %u", __func__,
594                                   types[i]);
595                     }
596           }
597           s->cur++;
598           if (s->cur == s->nrows) {
599                     PQclear(s->res);
600                     s->res = NULL;
601           }
602           return 0;
603 }
604 
605 int
fetchnext(struct fetchstatus * s,unsigned int n,const Oid * types,...)606 fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
607 {
608           va_list ap;
609           int error;
610 
611           va_start(ap, types);
612           error = vfetchnext(s, n, types, ap);
613           va_end(ap);
614           return error;
615 }
616 
617 void
fetchdone(struct fetchstatus * s)618 fetchdone(struct fetchstatus *s)
619 {
620 
621           if (s->res != NULL) {
622                     PQclear(s->res);
623                     s->res = NULL;
624           }
625           if (!s->done) {
626                     PGresult *res;
627                     unsigned int n;
628 
629                     n = 0;
630                     while ((res = PQgetResult(s->xc->conn)) != NULL) {
631                               PQclear(res);
632                               n++;
633                     }
634                     if (n > 0) {
635                               DPRINTF("%u rows dropped\n", n);
636                     }
637           }
638 }
639 
640 int
simplefetch(struct Xconn * xc,Oid type,...)641 simplefetch(struct Xconn *xc, Oid type, ...)
642 {
643           struct fetchstatus s;
644           va_list ap;
645           int error;
646 
647           fetchinit(&s, xc);
648           va_start(ap, type);
649           error = vfetchnext(&s, 1, &type, ap);
650           va_end(ap);
651           assert(error != 0 || s.res == NULL);
652           fetchdone(&s);
653           return error;
654 }
655 
656 /*
657  * setlabel: set the descriptive label for the connection.
658  *
659  * we use simple pointer comparison for label equality check.
660  */
661 static void
setlabel(struct Xconn * xc,const char * label)662 setlabel(struct Xconn *xc, const char *label)
663 {
664           int error;
665 
666           /*
667            * put the label into application_name so that it's shown in
668            * pg_stat_activity.  we are sure that our labels don't need
669            * PQescapeStringConn.
670            *
671            * example:
672            *        SELECT pid,application_name,query FROM pg_stat_activity
673            *        WHERE state <> 'idle'
674            */
675 
676           if (label != NULL && label != xc->label) {
677                     struct cmd *c;
678                     char cmd_str[1024];
679 
680                     snprintf(cmd_str, sizeof(cmd_str),
681                         "SET application_name TO 'pgfs: %s'", label);
682                     c = createcmd(cmd_str, CMD_NOPREPARE);
683                     error = simplecmd(xc, c);
684                     freecmd(c);
685                     assert(error == 0);
686                     xc->label = label;
687           } else {
688 #if 0 /* don't bother to clear label */
689                     static struct cmd *c;
690 
691                     CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'");
692                     error = simplecmd(xc, c);
693                     assert(error == 0);
694 #endif
695           }
696 }
697 
698 struct Xconn *
begin(struct puffs_usermount * pu,const char * label)699 begin(struct puffs_usermount *pu, const char *label)
700 {
701           struct Xconn *xc = getxc(puffs_cc_getcc(pu));
702           static struct cmd *c;
703           int error;
704 
705           setlabel(xc, label);
706           CREATECMD_NOPARAM(c, "BEGIN");
707           assert(!xc->in_trans);
708           error = simplecmd(xc, c);
709           assert(error == 0);
710           assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
711           xc->in_trans = true;
712           return xc;
713 }
714 
715 struct Xconn *
begin_readonly(struct puffs_usermount * pu,const char * label)716 begin_readonly(struct puffs_usermount *pu, const char *label)
717 {
718           struct Xconn *xc = getxc(puffs_cc_getcc(pu));
719           static struct cmd *c;
720           int error;
721 
722           setlabel(xc, label);
723           CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
724           assert(!xc->in_trans);
725           error = simplecmd(xc, c);
726           assert(error == 0);
727           assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
728           xc->in_trans = true;
729           return xc;
730 }
731 
732 void
rollback(struct Xconn * xc)733 rollback(struct Xconn *xc)
734 {
735           PGTransactionStatusType status;
736 
737           /*
738            * check the status as we are not sure the status of our transaction
739            * after a failed commit.
740            */
741           status = PQtransactionStatus(xc->conn);
742           assert(status != PQTRANS_ACTIVE);
743           assert(status != PQTRANS_UNKNOWN);
744           if (status != PQTRANS_IDLE) {
745                     static struct cmd *c;
746                     int error;
747 
748                     assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
749                     CREATECMD_NOPARAM(c, "ROLLBACK");
750                     error = simplecmd(xc, c);
751                     assert(error == 0);
752           }
753           DPRINTF("xc %p rollback %p\n", xc, xc->owner);
754           setlabel(xc, NULL);
755           relxc(xc);
756 }
757 
758 int
commit(struct Xconn * xc)759 commit(struct Xconn *xc)
760 {
761           static struct cmd *c;
762           int error;
763 
764           CREATECMD_NOPARAM(c, "COMMIT");
765           error = simplecmd(xc, c);
766           setlabel(xc, NULL);
767           if (error == 0) {
768                     DPRINTF("xc %p commit %p\n", xc, xc->owner);
769                     relxc(xc);
770           }
771           return error;
772 }
773 
774 int
commit_sync(struct Xconn * xc)775 commit_sync(struct Xconn *xc)
776 {
777           static struct cmd *c;
778           int error;
779 
780           assert(!pgfs_dosync);
781           CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
782           error = simplecmd(xc, c);
783           assert(error == 0);
784           return commit(xc);
785 }
786 
787 static void
pgfs_notice_receiver(void * vp,const PGresult * res)788 pgfs_notice_receiver(void *vp, const PGresult *res)
789 {
790           struct Xconn *xc = vp;
791 
792           assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
793           fprintf(stderr, "got a notice on %p\n", xc);
794           dumperror(xc, res);
795 }
796 
797 static int
pgfs_readframe(struct puffs_usermount * pu,struct puffs_framebuf * pufbuf,int fd,int * done)798 pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
799     int fd, int *done)
800 {
801           struct Xconn *xc;
802           PGconn *conn;
803 
804           TAILQ_FOREACH(xc, &xclist, list) {
805                     if (PQsocket(xc->conn) == fd) {
806                               break;
807                     }
808           }
809           assert(xc != NULL);
810           conn = xc->conn;
811           PQconsumeInput(conn);
812           if (!PQisBusy(conn)) {
813                     if (xc->blocker != NULL) {
814                               DPRINTF("schedule %p\n", xc->blocker);
815                               puffs_cc_schedule(xc->blocker);
816                     } else {
817                               DPRINTF("no blockers\n");
818                     }
819           }
820           *done = 0;
821           return 0;
822 }
823 
824 int
pgfs_connectdb(struct puffs_usermount * pu,const char * dbname,const char * dbuser,bool debug,bool synchronous,unsigned int nconn)825 pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
826     const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
827 {
828           const char *keywords[3+1];
829           const char *values[3];
830           unsigned int i;
831 
832           if (nconn > 32) {
833                     /*
834                      * limit from sizeof(cmd->prepared_mask)
835                      */
836                     return EINVAL;
837           }
838           if (debug) {
839                     pgfs_dodprintf = true;
840           }
841           if (synchronous) {
842                     pgfs_dosync = true;
843           }
844           i = 0;
845           if (dbname != NULL) {
846                     keywords[i] = "dbname";
847                     values[i] = dbname;
848                     i++;
849           }
850           if (dbuser != NULL) {
851                     keywords[i] = "user";
852                     values[i] = dbuser;
853                     i++;
854           }
855           keywords[i] = "application_name";
856           values[i] = "pgfs";
857           i++;
858           keywords[i] = NULL;
859           puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
860           for (i = 0; i < nconn; i++) {
861                     struct Xconn *xc;
862                     struct Xconn *xc2;
863                     static int xcid;
864                     PGconn *conn;
865                     struct cmd *c;
866                     int error;
867 
868                     conn = PQconnectdbParams(keywords, values, 0);
869                     if (conn == NULL) {
870                               errx(EXIT_FAILURE,
871                                   "PQconnectdbParams: unknown failure");
872                     }
873                     if (PQstatus(conn) != CONNECTION_OK) {
874                               /*
875                                * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
876                                */
877                               errx(EXIT_FAILURE, "PQconnectdbParams: %s",
878                                   PQerrorMessage(conn));
879                     }
880                     DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
881                     puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
882                     xc = emalloc(sizeof(*xc));
883                     xc->conn = conn;
884                     xc->blocker = NULL;
885                     xc->owner = NULL;
886                     xc->in_trans = false;
887                     xc->id = xcid++;
888                     xc->label = NULL;
889                     assert(xc->id < 32);
890                     PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
891                     TAILQ_INSERT_HEAD(&xclist, xc, list);
892                     xc2 = begin(pu, NULL);
893                     assert(xc2 == xc);
894                     c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
895                     error = simplecmd(xc, c);
896                     assert(error == 0);
897                     freecmd(c);
898                     c = createcmd("SET SESSION CHARACTERISTICS AS "
899                         "TRANSACTION ISOLATION LEVEL REPEATABLE READ",
900                         CMD_NOPREPARE);
901                     error = simplecmd(xc, c);
902                     assert(error == 0);
903                     freecmd(c);
904                     c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
905                     error = simplecmd(xc, c);
906                     assert(error == 0);
907                     freecmd(c);
908                     if (!pgfs_dosync) {
909                               c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
910                                   CMD_NOPREPARE);
911                               error = simplecmd(xc, c);
912                               assert(error == 0);
913                               freecmd(c);
914                     }
915                     if (debug) {
916                               struct fetchstatus s;
917                               static const Oid types[] = { INT8OID, };
918                               uint64_t pid;
919 
920                               c = createcmd("SELECT pg_backend_pid()::int8;",
921                                   CMD_NOPREPARE);
922                               error = sendcmd(xc, c);
923                               assert(error == 0);
924                               fetchinit(&s, xc);
925                               error = FETCHNEXT(&s, types, &pid);
926                               fetchdone(&s);
927                               assert(error == 0);
928                               DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
929                     }
930                     error = commit(xc);
931                     assert(error == 0);
932                     assert(xc->owner == NULL);
933           }
934           /*
935            * XXX cleanup unlinked files here?  what to do when the filesystem
936            * is shared?
937            */
938           return 0;
939 }
940 
941 struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
942 struct puffs_cc *flusher = NULL;
943 
944 int
flush_xacts(struct puffs_usermount * pu)945 flush_xacts(struct puffs_usermount *pu)
946 {
947           struct puffs_cc *cc = puffs_cc_getcc(pu);
948           struct Xconn *xc;
949           static struct cmd *c;
950           uint64_t dummy;
951           int error;
952 
953           /*
954            * flush all previously issued asynchronous transactions.
955            *
956            * XXX
957            * unfortunately it seems that there is no clean way to tell
958            * PostgreSQL flush XLOG.  we could perform a CHECKPOINT but it's
959            * too expensive and overkill for our purpose.
960            * besides, PostgreSQL has an optimization to skip XLOG flushing
961            * for transactions which didn't produce WAL records.
962            * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
963            * it means that an empty transaction ("BEGIN; COMMIT;"), which
964            * doesn't produce any WAL records, doesn't flush the XLOG even if
965            * synchronous_commit=on.  we issues a dummy setval() to avoid the
966            * optimization.
967            * on the other hand, we try to avoid creating unnecessary WAL activity
968            * by serializing flushing and checking XLOG locations.
969            */
970 
971           assert(!pgfs_dosync);
972           if (flusher != NULL) { /* serialize flushers */
973                     DPRINTF("%p flush in progress %p\n", cc, flusher);
974                     waiton(&flushwaitq, cc);
975                     assert(flusher == NULL);
976           }
977           DPRINTF("%p start flushing\n", cc);
978           flusher = cc;
979 retry:
980           xc = begin(pu, "flush");
981           CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
982               "pg_current_xlog_insert_location() <> pg_current_xlog_location()");
983           error = sendcmd(xc, c);
984           if (error != 0) {
985                     goto got_error;
986           }
987           error = simplefetch(xc, INT8OID, &dummy);
988           assert(error != 0 || dummy == 1);
989           if (error == ENOENT) {
990                     /*
991                      * there seems to be nothing to flush.
992                      */
993                     DPRINTF("%p no sync\n", cc);
994                     error = 0;
995           }
996           if (error != 0) {
997                     goto got_error;
998           }
999           error = commit_sync(xc);
1000           if (error != 0) {
1001                     goto got_error;
1002           }
1003           goto done;
1004 got_error:
1005           rollback(xc);
1006           if (error == EAGAIN) {
1007                     goto retry;
1008           }
1009 done:
1010           assert(flusher == cc);
1011           flusher = NULL;
1012           wakeup_one(&flushwaitq);
1013           DPRINTF("%p end flushing error=%d\n", cc, error);
1014           return error;
1015 }
1016