1 /*-
2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 *
26 */
27
28 #include <sys/cdefs.h>
29 __FBSDID("$FreeBSD$");
30
31 #include <sys/types.h>
32 #include <sys/event.h>
33 #include <sys/socket.h>
34 #include <sys/time.h>
35
36 #include <assert.h>
37 #include <errno.h>
38 #include <nsswitch.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <string.h>
42
43 #include "cachelib.h"
44 #include "config.h"
45 #include "debug.h"
46 #include "log.h"
47 #include "query.h"
48 #include "mp_rs_query.h"
49 #include "mp_ws_query.h"
50 #include "singletons.h"
51
52 static int on_mp_read_session_close_notification(struct query_state *);
53 static void on_mp_read_session_destroy(struct query_state *);
54 static int on_mp_read_session_mapper(struct query_state *);
55 /* int on_mp_read_session_request_read1(struct query_state *); */
56 static int on_mp_read_session_request_read2(struct query_state *);
57 static int on_mp_read_session_request_process(struct query_state *);
58 static int on_mp_read_session_response_write1(struct query_state *);
59 static int on_mp_read_session_read_request_process(struct query_state *);
60 static int on_mp_read_session_read_response_write1(struct query_state *);
61 static int on_mp_read_session_read_response_write2(struct query_state *);
62
63 /*
64 * This function is used as the query_state's destroy_func to make the
65 * proper cleanup in case of errors.
66 */
67 static void
on_mp_read_session_destroy(struct query_state * qstate)68 on_mp_read_session_destroy(struct query_state *qstate)
69 {
70 TRACE_IN(on_mp_read_session_destroy);
71 finalize_comm_element(&qstate->request);
72 finalize_comm_element(&qstate->response);
73
74 if (qstate->mdata != NULL) {
75 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
76 close_cache_mp_read_session(
77 (cache_mp_read_session)qstate->mdata);
78 configuration_unlock_entry(qstate->config_entry,
79 CELT_MULTIPART);
80 }
81 TRACE_OUT(on_mp_read_session_destroy);
82 }
83
84 /*
85 * The functions below are used to process multipart read session initiation
86 * requests.
87 * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
88 * the request itself
89 * - on_mp_read_session_request_process processes it
90 * - on_mp_read_session_response_write1 sends the response
91 */
92 int
on_mp_read_session_request_read1(struct query_state * qstate)93 on_mp_read_session_request_read1(struct query_state *qstate)
94 {
95 struct cache_mp_read_session_request *c_mp_rs_request;
96 ssize_t result;
97
98 TRACE_IN(on_mp_read_session_request_read1);
99 if (qstate->kevent_watermark == 0)
100 qstate->kevent_watermark = sizeof(size_t);
101 else {
102 init_comm_element(&qstate->request,
103 CET_MP_READ_SESSION_REQUEST);
104 c_mp_rs_request = get_cache_mp_read_session_request(
105 &qstate->request);
106
107 result = qstate->read_func(qstate,
108 &c_mp_rs_request->entry_length, sizeof(size_t));
109
110 if (result != sizeof(size_t)) {
111 TRACE_OUT(on_mp_read_session_request_read1);
112 return (-1);
113 }
114
115 if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
116 TRACE_OUT(on_mp_read_session_request_read1);
117 return (-1);
118 }
119
120 c_mp_rs_request->entry = calloc(1,
121 c_mp_rs_request->entry_length + 1);
122 assert(c_mp_rs_request->entry != NULL);
123
124 qstate->kevent_watermark = c_mp_rs_request->entry_length;
125 qstate->process_func = on_mp_read_session_request_read2;
126 }
127 TRACE_OUT(on_mp_read_session_request_read1);
128 return (0);
129 }
130
131 static int
on_mp_read_session_request_read2(struct query_state * qstate)132 on_mp_read_session_request_read2(struct query_state *qstate)
133 {
134 struct cache_mp_read_session_request *c_mp_rs_request;
135 ssize_t result;
136
137 TRACE_IN(on_mp_read_session_request_read2);
138 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
139
140 result = qstate->read_func(qstate, c_mp_rs_request->entry,
141 c_mp_rs_request->entry_length);
142
143 if (result < 0 || (size_t)result != qstate->kevent_watermark) {
144 LOG_ERR_3("on_mp_read_session_request_read2",
145 "read failed");
146 TRACE_OUT(on_mp_read_session_request_read2);
147 return (-1);
148 }
149
150 qstate->kevent_watermark = 0;
151 qstate->process_func = on_mp_read_session_request_process;
152 TRACE_OUT(on_mp_read_session_request_read2);
153 return (0);
154 }
155
156 static int
on_mp_read_session_request_process(struct query_state * qstate)157 on_mp_read_session_request_process(struct query_state *qstate)
158 {
159 struct cache_mp_read_session_request *c_mp_rs_request;
160 struct cache_mp_read_session_response *c_mp_rs_response;
161 cache_mp_read_session rs;
162 cache_entry c_entry;
163 char *dec_cache_entry_name;
164
165 char *buffer;
166 size_t buffer_size;
167 cache_mp_write_session ws;
168 struct agent *lookup_agent;
169 struct multipart_agent *mp_agent;
170 void *mdata;
171 int res;
172
173 TRACE_IN(on_mp_read_session_request_process);
174 init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
175 c_mp_rs_response = get_cache_mp_read_session_response(
176 &qstate->response);
177 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
178
179 qstate->config_entry = configuration_find_entry(
180 s_configuration, c_mp_rs_request->entry);
181 if (qstate->config_entry == NULL) {
182 c_mp_rs_response->error_code = ENOENT;
183
184 LOG_ERR_2("read_session_request",
185 "can't find configuration entry '%s'."
186 " aborting request", c_mp_rs_request->entry);
187 goto fin;
188 }
189
190 if (qstate->config_entry->enabled == 0) {
191 c_mp_rs_response->error_code = EACCES;
192
193 LOG_ERR_2("read_session_request",
194 "configuration entry '%s' is disabled",
195 c_mp_rs_request->entry);
196 goto fin;
197 }
198
199 if (qstate->config_entry->perform_actual_lookups != 0)
200 dec_cache_entry_name = strdup(
201 qstate->config_entry->mp_cache_params.cep.entry_name);
202 else {
203 #ifdef NS_NSCD_EID_CHECKING
204 if (check_query_eids(qstate) != 0) {
205 c_mp_rs_response->error_code = EPERM;
206 goto fin;
207 }
208 #endif
209
210 asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
211 qstate->config_entry->mp_cache_params.cep.entry_name);
212 }
213
214 assert(dec_cache_entry_name != NULL);
215
216 configuration_lock_rdlock(s_configuration);
217 c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
218 configuration_unlock(s_configuration);
219
220 if ((c_entry == INVALID_CACHE) &&
221 (qstate->config_entry->perform_actual_lookups != 0))
222 c_entry = register_new_mp_cache_entry(qstate,
223 dec_cache_entry_name);
224
225 free(dec_cache_entry_name);
226
227 if (c_entry != INVALID_CACHE_ENTRY) {
228 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
229 rs = open_cache_mp_read_session(c_entry);
230 configuration_unlock_entry(qstate->config_entry,
231 CELT_MULTIPART);
232
233 if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
234 (qstate->config_entry->perform_actual_lookups != 0)) {
235 lookup_agent = find_agent(s_agent_table,
236 c_mp_rs_request->entry, MULTIPART_AGENT);
237
238 if ((lookup_agent != NULL) &&
239 (lookup_agent->type == MULTIPART_AGENT)) {
240 mp_agent = (struct multipart_agent *)
241 lookup_agent;
242 mdata = mp_agent->mp_init_func();
243
244 /*
245 * Multipart agents read the whole snapshot
246 * of the data at one time.
247 */
248 configuration_lock_entry(qstate->config_entry,
249 CELT_MULTIPART);
250 ws = open_cache_mp_write_session(c_entry);
251 configuration_unlock_entry(qstate->config_entry,
252 CELT_MULTIPART);
253 if (ws != NULL) {
254 do {
255 buffer = NULL;
256 res = mp_agent->mp_lookup_func(&buffer,
257 &buffer_size,
258 mdata);
259
260 if ((res & NS_TERMINATE) &&
261 (buffer != NULL)) {
262 configuration_lock_entry(
263 qstate->config_entry,
264 CELT_MULTIPART);
265 if (cache_mp_write(ws, buffer,
266 buffer_size) != 0) {
267 abandon_cache_mp_write_session(ws);
268 ws = NULL;
269 }
270 configuration_unlock_entry(
271 qstate->config_entry,
272 CELT_MULTIPART);
273
274 free(buffer);
275 buffer = NULL;
276 } else {
277 configuration_lock_entry(
278 qstate->config_entry,
279 CELT_MULTIPART);
280 close_cache_mp_write_session(ws);
281 configuration_unlock_entry(
282 qstate->config_entry,
283 CELT_MULTIPART);
284
285 free(buffer);
286 buffer = NULL;
287 }
288 } while ((res & NS_TERMINATE) &&
289 (ws != NULL));
290 }
291
292 configuration_lock_entry(qstate->config_entry,
293 CELT_MULTIPART);
294 rs = open_cache_mp_read_session(c_entry);
295 configuration_unlock_entry(qstate->config_entry,
296 CELT_MULTIPART);
297 }
298 }
299
300 if (rs == INVALID_CACHE_MP_READ_SESSION)
301 c_mp_rs_response->error_code = -1;
302 else {
303 qstate->mdata = rs;
304 qstate->destroy_func = on_mp_read_session_destroy;
305
306 configuration_lock_entry(qstate->config_entry,
307 CELT_MULTIPART);
308 if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
309 (qstate->config_entry->mp_query_timeout.tv_usec != 0))
310 memcpy(&qstate->timeout,
311 &qstate->config_entry->mp_query_timeout,
312 sizeof(struct timeval));
313 configuration_unlock_entry(qstate->config_entry,
314 CELT_MULTIPART);
315 }
316 } else
317 c_mp_rs_response->error_code = -1;
318
319 fin:
320 qstate->process_func = on_mp_read_session_response_write1;
321 qstate->kevent_watermark = sizeof(int);
322 qstate->kevent_filter = EVFILT_WRITE;
323
324 TRACE_OUT(on_mp_read_session_request_process);
325 return (0);
326 }
327
328 static int
on_mp_read_session_response_write1(struct query_state * qstate)329 on_mp_read_session_response_write1(struct query_state *qstate)
330 {
331 struct cache_mp_read_session_response *c_mp_rs_response;
332 ssize_t result;
333
334 TRACE_IN(on_mp_read_session_response_write1);
335 c_mp_rs_response = get_cache_mp_read_session_response(
336 &qstate->response);
337 result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
338 sizeof(int));
339
340 if (result != sizeof(int)) {
341 LOG_ERR_3("on_mp_read_session_response_write1",
342 "write failed");
343 TRACE_OUT(on_mp_read_session_response_write1);
344 return (-1);
345 }
346
347 if (c_mp_rs_response->error_code == 0) {
348 qstate->kevent_watermark = sizeof(int);
349 qstate->process_func = on_mp_read_session_mapper;
350 qstate->kevent_filter = EVFILT_READ;
351 } else {
352 qstate->kevent_watermark = 0;
353 qstate->process_func = NULL;
354 }
355 TRACE_OUT(on_mp_read_session_response_write1);
356 return (0);
357 }
358
359 /*
360 * Mapper function is used to avoid multiple connections for each session
361 * write or read requests. After processing the request, it does not close
362 * the connection, but waits for the next request.
363 */
364 static int
on_mp_read_session_mapper(struct query_state * qstate)365 on_mp_read_session_mapper(struct query_state *qstate)
366 {
367 ssize_t result;
368 int elem_type;
369
370 TRACE_IN(on_mp_read_session_mapper);
371 if (qstate->kevent_watermark == 0) {
372 qstate->kevent_watermark = sizeof(int);
373 } else {
374 result = qstate->read_func(qstate, &elem_type, sizeof(int));
375 if (result != sizeof(int)) {
376 LOG_ERR_3("on_mp_read_session_mapper",
377 "read failed");
378 TRACE_OUT(on_mp_read_session_mapper);
379 return (-1);
380 }
381
382 switch (elem_type) {
383 case CET_MP_READ_SESSION_READ_REQUEST:
384 qstate->kevent_watermark = 0;
385 qstate->process_func =
386 on_mp_read_session_read_request_process;
387 break;
388 case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
389 qstate->kevent_watermark = 0;
390 qstate->process_func =
391 on_mp_read_session_close_notification;
392 break;
393 default:
394 qstate->kevent_watermark = 0;
395 qstate->process_func = NULL;
396 LOG_ERR_3("on_mp_read_session_mapper",
397 "unknown element type");
398 TRACE_OUT(on_mp_read_session_mapper);
399 return (-1);
400 }
401 }
402 TRACE_OUT(on_mp_read_session_mapper);
403 return (0);
404 }
405
406 /*
407 * The functions below are used to process multipart read sessions read
408 * requests. User doesn't have to pass any kind of data, besides the
409 * request identificator itself. So we don't need any XXX_read functions and
410 * start with the XXX_process function.
411 * - on_mp_read_session_read_request_process processes it
412 * - on_mp_read_session_read_response_write1 and
413 * on_mp_read_session_read_response_write2 sends the response
414 */
415 static int
on_mp_read_session_read_request_process(struct query_state * qstate)416 on_mp_read_session_read_request_process(struct query_state *qstate)
417 {
418 struct cache_mp_read_session_read_response *read_response;
419
420 TRACE_IN(on_mp_read_session_response_process);
421 init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
422 read_response = get_cache_mp_read_session_read_response(
423 &qstate->response);
424
425 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
426 read_response->error_code = cache_mp_read(
427 (cache_mp_read_session)qstate->mdata, NULL,
428 &read_response->data_size);
429
430 if (read_response->error_code == 0) {
431 read_response->data = malloc(read_response->data_size);
432 assert(read_response != NULL);
433 read_response->error_code = cache_mp_read(
434 (cache_mp_read_session)qstate->mdata,
435 read_response->data,
436 &read_response->data_size);
437 }
438 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
439
440 if (read_response->error_code == 0)
441 qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
442 else
443 qstate->kevent_watermark = sizeof(int);
444 qstate->process_func = on_mp_read_session_read_response_write1;
445 qstate->kevent_filter = EVFILT_WRITE;
446
447 TRACE_OUT(on_mp_read_session_response_process);
448 return (0);
449 }
450
451 static int
on_mp_read_session_read_response_write1(struct query_state * qstate)452 on_mp_read_session_read_response_write1(struct query_state *qstate)
453 {
454 struct cache_mp_read_session_read_response *read_response;
455 ssize_t result;
456
457 TRACE_IN(on_mp_read_session_read_response_write1);
458 read_response = get_cache_mp_read_session_read_response(
459 &qstate->response);
460
461 result = qstate->write_func(qstate, &read_response->error_code,
462 sizeof(int));
463 if (read_response->error_code == 0) {
464 result += qstate->write_func(qstate, &read_response->data_size,
465 sizeof(size_t));
466 if (result < 0 || (size_t)result != qstate->kevent_watermark) {
467 TRACE_OUT(on_mp_read_session_read_response_write1);
468 LOG_ERR_3("on_mp_read_session_read_response_write1",
469 "write failed");
470 return (-1);
471 }
472
473 qstate->kevent_watermark = read_response->data_size;
474 qstate->process_func = on_mp_read_session_read_response_write2;
475 } else {
476 if (result < 0 || (size_t)result != qstate->kevent_watermark) {
477 LOG_ERR_3("on_mp_read_session_read_response_write1",
478 "write failed");
479 TRACE_OUT(on_mp_read_session_read_response_write1);
480 return (-1);
481 }
482
483 qstate->kevent_watermark = 0;
484 qstate->process_func = NULL;
485 }
486
487 TRACE_OUT(on_mp_read_session_read_response_write1);
488 return (0);
489 }
490
491 static int
on_mp_read_session_read_response_write2(struct query_state * qstate)492 on_mp_read_session_read_response_write2(struct query_state *qstate)
493 {
494 struct cache_mp_read_session_read_response *read_response;
495 ssize_t result;
496
497 TRACE_IN(on_mp_read_session_read_response_write2);
498 read_response = get_cache_mp_read_session_read_response(
499 &qstate->response);
500 result = qstate->write_func(qstate, read_response->data,
501 read_response->data_size);
502 if (result < 0 || (size_t)result != qstate->kevent_watermark) {
503 LOG_ERR_3("on_mp_read_session_read_response_write2",
504 "write failed");
505 TRACE_OUT(on_mp_read_session_read_response_write2);
506 return (-1);
507 }
508
509 finalize_comm_element(&qstate->request);
510 finalize_comm_element(&qstate->response);
511
512 qstate->kevent_watermark = sizeof(int);
513 qstate->process_func = on_mp_read_session_mapper;
514 qstate->kevent_filter = EVFILT_READ;
515
516 TRACE_OUT(on_mp_read_session_read_response_write2);
517 return (0);
518 }
519
520 /*
521 * Handles session close notification by calling close_cache_mp_read_session
522 * function.
523 */
524 static int
on_mp_read_session_close_notification(struct query_state * qstate)525 on_mp_read_session_close_notification(struct query_state *qstate)
526 {
527
528 TRACE_IN(on_mp_read_session_close_notification);
529 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
530 close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
531 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
532 qstate->mdata = NULL;
533 qstate->kevent_watermark = 0;
534 qstate->process_func = NULL;
535 TRACE_OUT(on_mp_read_session_close_notification);
536 return (0);
537 }
538