ViewVC Help
View File | Revision Log | Show Annotations | Download File | View Changeset | Root Listing
root/src/stable/0.8/contrib/serf/buckets/bwtp_buckets.c
Revision: 9268
Committed: Mon Feb 20 02:52:22 2017 UTC (7 years, 2 months ago) by laffer1
Content type: text/plain
File size: 17625 byte(s)
Log Message:
merge in serf 1.3.9

File Contents

# Content
1 /* ====================================================================
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 * ====================================================================
19 */
20
21 #include <apr_pools.h>
22 #include <apr_strings.h>
23 #include <apr_lib.h>
24 #include <apr_date.h>
25
26 #include "serf.h"
27 #include "serf_bucket_util.h"
28 #include "serf_bucket_types.h"
29
30 #include <stdlib.h>
31
32 /* This is an implementation of Bidirectional Web Transfer Protocol (BWTP)
33 * See:
34 * http://bwtp.wikidot.com/
35 */
36
37 typedef struct {
38 int channel;
39 int open;
40 int type; /* 0 = header, 1 = message */ /* TODO enum? */
41 const char *phrase;
42 serf_bucket_t *headers;
43
44 char req_line[1000];
45 } frame_context_t;
46
47 typedef struct {
48 serf_bucket_t *stream;
49 serf_bucket_t *body; /* Pointer to the stream wrapping the body. */
50 serf_bucket_t *headers; /* holds parsed headers */
51
52 enum {
53 STATE_STATUS_LINE, /* reading status line */
54 STATE_HEADERS, /* reading headers */
55 STATE_BODY, /* reading body */
56 STATE_DONE /* we've sent EOF */
57 } state;
58
59 /* Buffer for accumulating a line from the response. */
60 serf_linebuf_t linebuf;
61
62 int type; /* 0 = header, 1 = message */ /* TODO enum? */
63 int channel;
64 char *phrase;
65 apr_size_t length;
66 } incoming_context_t;
67
68
69 serf_bucket_t *serf_bucket_bwtp_channel_close(
70 int channel,
71 serf_bucket_alloc_t *allocator)
72 {
73 frame_context_t *ctx;
74
75 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
76 ctx->type = 0;
77 ctx->open = 0;
78 ctx->channel = channel;
79 ctx->phrase = "CLOSED";
80 ctx->headers = serf_bucket_headers_create(allocator);
81
82 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
83 }
84
85 serf_bucket_t *serf_bucket_bwtp_channel_open(
86 int channel,
87 const char *uri,
88 serf_bucket_alloc_t *allocator)
89 {
90 frame_context_t *ctx;
91
92 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
93 ctx->type = 0;
94 ctx->open = 1;
95 ctx->channel = channel;
96 ctx->phrase = uri;
97 ctx->headers = serf_bucket_headers_create(allocator);
98
99 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
100 }
101
102 serf_bucket_t *serf_bucket_bwtp_header_create(
103 int channel,
104 const char *phrase,
105 serf_bucket_alloc_t *allocator)
106 {
107 frame_context_t *ctx;
108
109 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
110 ctx->type = 0;
111 ctx->open = 0;
112 ctx->channel = channel;
113 ctx->phrase = phrase;
114 ctx->headers = serf_bucket_headers_create(allocator);
115
116 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
117 }
118
119 serf_bucket_t *serf_bucket_bwtp_message_create(
120 int channel,
121 serf_bucket_t *body,
122 serf_bucket_alloc_t *allocator)
123 {
124 frame_context_t *ctx;
125
126 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
127 ctx->type = 1;
128 ctx->open = 0;
129 ctx->channel = channel;
130 ctx->phrase = "MESSAGE";
131 ctx->headers = serf_bucket_headers_create(allocator);
132
133 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
134 }
135
136 int serf_bucket_bwtp_frame_get_channel(
137 serf_bucket_t *bucket)
138 {
139 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
140 frame_context_t *ctx = bucket->data;
141
142 return ctx->channel;
143 }
144 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
145 incoming_context_t *ctx = bucket->data;
146
147 return ctx->channel;
148 }
149
150 return -1;
151 }
152
153 int serf_bucket_bwtp_frame_get_type(
154 serf_bucket_t *bucket)
155 {
156 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
157 frame_context_t *ctx = bucket->data;
158
159 return ctx->type;
160 }
161 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
162 incoming_context_t *ctx = bucket->data;
163
164 return ctx->type;
165 }
166
167 return -1;
168 }
169
170 const char *serf_bucket_bwtp_frame_get_phrase(
171 serf_bucket_t *bucket)
172 {
173 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
174 frame_context_t *ctx = bucket->data;
175
176 return ctx->phrase;
177 }
178 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
179 incoming_context_t *ctx = bucket->data;
180
181 return ctx->phrase;
182 }
183
184 return NULL;
185 }
186
187 serf_bucket_t *serf_bucket_bwtp_frame_get_headers(
188 serf_bucket_t *bucket)
189 {
190 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
191 frame_context_t *ctx = bucket->data;
192
193 return ctx->headers;
194 }
195 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
196 incoming_context_t *ctx = bucket->data;
197
198 return ctx->headers;
199 }
200
201 return NULL;
202 }
203
204 static int count_size(void *baton, const char *key, const char *value)
205 {
206 apr_size_t *c = baton;
207 /* TODO Deal with folding. Yikes. */
208
209 /* Add in ": " and CRLF - so an extra four bytes. */
210 *c += strlen(key) + strlen(value) + 4;
211
212 return 0;
213 }
214
215 static apr_size_t calc_header_size(serf_bucket_t *hdrs)
216 {
217 apr_size_t size = 0;
218
219 serf_bucket_headers_do(hdrs, count_size, &size);
220
221 return size;
222 }
223
224 static void serialize_data(serf_bucket_t *bucket)
225 {
226 frame_context_t *ctx = bucket->data;
227 serf_bucket_t *new_bucket;
228 apr_size_t req_len;
229
230 /* Serialize the request-line and headers into one mother string,
231 * and wrap a bucket around it.
232 */
233 req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line),
234 "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n",
235 (ctx->type ? "BWM" : "BWH"),
236 ctx->channel, calc_header_size(ctx->headers),
237 (ctx->open ? "OPEN " : ""),
238 ctx->phrase);
239 new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len,
240 bucket->allocator);
241
242 /* Build up the new bucket structure.
243 *
244 * Note that self needs to become an aggregate bucket so that a
245 * pointer to self still represents the "right" data.
246 */
247 serf_bucket_aggregate_become(bucket);
248
249 /* Insert the two buckets. */
250 serf_bucket_aggregate_append(bucket, new_bucket);
251 serf_bucket_aggregate_append(bucket, ctx->headers);
252
253 /* Our private context is no longer needed, and is not referred to by
254 * any existing bucket. Toss it.
255 */
256 serf_bucket_mem_free(bucket->allocator, ctx);
257 }
258
259 static apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket,
260 apr_size_t requested,
261 const char **data, apr_size_t *len)
262 {
263 /* Seralize our private data into a new aggregate bucket. */
264 serialize_data(bucket);
265
266 /* Delegate to the "new" aggregate bucket to do the read. */
267 return serf_bucket_read(bucket, requested, data, len);
268 }
269
270 static apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket,
271 int acceptable, int *found,
272 const char **data, apr_size_t *len)
273 {
274 /* Seralize our private data into a new aggregate bucket. */
275 serialize_data(bucket);
276
277 /* Delegate to the "new" aggregate bucket to do the readline. */
278 return serf_bucket_readline(bucket, acceptable, found, data, len);
279 }
280
281 static apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket,
282 apr_size_t requested,
283 int vecs_size,
284 struct iovec *vecs,
285 int *vecs_used)
286 {
287 /* Seralize our private data into a new aggregate bucket. */
288 serialize_data(bucket);
289
290 /* Delegate to the "new" aggregate bucket to do the read. */
291 return serf_bucket_read_iovec(bucket, requested,
292 vecs_size, vecs, vecs_used);
293 }
294
295 static apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket,
296 const char **data,
297 apr_size_t *len)
298 {
299 /* Seralize our private data into a new aggregate bucket. */
300 serialize_data(bucket);
301
302 /* Delegate to the "new" aggregate bucket to do the peek. */
303 return serf_bucket_peek(bucket, data, len);
304 }
305
306 const serf_bucket_type_t serf_bucket_type_bwtp_frame = {
307 "BWTP-FRAME",
308 serf_bwtp_frame_read,
309 serf_bwtp_frame_readline,
310 serf_bwtp_frame_read_iovec,
311 serf_default_read_for_sendfile,
312 serf_default_read_bucket,
313 serf_bwtp_frame_peek,
314 serf_default_destroy_and_data,
315 };
316
317
318 serf_bucket_t *serf_bucket_bwtp_incoming_frame_create(
319 serf_bucket_t *stream,
320 serf_bucket_alloc_t *allocator)
321 {
322 incoming_context_t *ctx;
323
324 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
325 ctx->stream = stream;
326 ctx->body = NULL;
327 ctx->headers = serf_bucket_headers_create(allocator);
328 ctx->state = STATE_STATUS_LINE;
329 ctx->length = 0;
330 ctx->channel = -1;
331 ctx->phrase = NULL;
332
333 serf_linebuf_init(&ctx->linebuf);
334
335 return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx);
336 }
337
338 static void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket)
339 {
340 incoming_context_t *ctx = bucket->data;
341
342 if (ctx->state != STATE_STATUS_LINE && ctx->phrase) {
343 serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase);
344 }
345
346 serf_bucket_destroy(ctx->stream);
347 if (ctx->body != NULL)
348 serf_bucket_destroy(ctx->body);
349 serf_bucket_destroy(ctx->headers);
350
351 serf_default_destroy_and_data(bucket);
352 }
353
354 static apr_status_t fetch_line(incoming_context_t *ctx, int acceptable)
355 {
356 return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable);
357 }
358
359 static apr_status_t parse_status_line(incoming_context_t *ctx,
360 serf_bucket_alloc_t *allocator)
361 {
362 int res;
363 char *reason; /* ### stupid APR interface makes this non-const */
364
365 /* ctx->linebuf.line should be of form: BW* */
366 res = apr_date_checkmask(ctx->linebuf.line, "BW*");
367 if (!res) {
368 /* Not an BWTP response? Well, at least we won't understand it. */
369 return APR_EGENERAL;
370 }
371
372 if (ctx->linebuf.line[2] == 'H') {
373 ctx->type = 0;
374 }
375 else if (ctx->linebuf.line[2] == 'M') {
376 ctx->type = 1;
377 }
378 else {
379 ctx->type = -1;
380 }
381
382 ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16);
383
384 /* Skip leading spaces for the reason string. */
385 if (apr_isspace(*reason)) {
386 reason++;
387 }
388
389 ctx->length = apr_strtoi64(reason, &reason, 16);
390
391 /* Skip leading spaces for the reason string. */
392 if (reason - ctx->linebuf.line < ctx->linebuf.used) {
393 if (apr_isspace(*reason)) {
394 reason++;
395 }
396
397 ctx->phrase = serf_bstrmemdup(allocator, reason,
398 ctx->linebuf.used
399 - (reason - ctx->linebuf.line));
400 } else {
401 ctx->phrase = NULL;
402 }
403
404 return APR_SUCCESS;
405 }
406
407 /* This code should be replaced with header buckets. */
408 static apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx)
409 {
410 apr_status_t status;
411
412 /* RFC 2616 says that CRLF is the only line ending, but we can easily
413 * accept any kind of line ending.
414 */
415 status = fetch_line(ctx, SERF_NEWLINE_ANY);
416 if (SERF_BUCKET_READ_ERROR(status)) {
417 return status;
418 }
419 /* Something was read. Process it. */
420
421 if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
422 const char *end_key;
423 const char *c;
424
425 end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
426 if (!c) {
427 /* Bad headers? */
428 return APR_EGENERAL;
429 }
430
431 /* Skip over initial : and spaces. */
432 while (apr_isspace(*++c))
433 continue;
434
435 /* Always copy the headers (from the linebuf into new mem). */
436 /* ### we should be able to optimize some mem copies */
437 serf_bucket_headers_setx(
438 ctx->headers,
439 ctx->linebuf.line, end_key - ctx->linebuf.line, 1,
440 c, ctx->linebuf.line + ctx->linebuf.used - c, 1);
441 }
442
443 return status;
444 }
445
446 /* Perform one iteration of the state machine.
447 *
448 * Will return when one the following conditions occurred:
449 * 1) a state change
450 * 2) an error
451 * 3) the stream is not ready or at EOF
452 * 4) APR_SUCCESS, meaning the machine can be run again immediately
453 */
454 static apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx)
455 {
456 apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */
457
458 switch (ctx->state) {
459 case STATE_STATUS_LINE:
460 /* RFC 2616 says that CRLF is the only line ending, but we can easily
461 * accept any kind of line ending.
462 */
463 status = fetch_line(ctx, SERF_NEWLINE_ANY);
464 if (SERF_BUCKET_READ_ERROR(status))
465 return status;
466
467 if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
468 /* The Status-Line is in the line buffer. Process it. */
469 status = parse_status_line(ctx, bkt->allocator);
470 if (status)
471 return status;
472
473 if (ctx->length) {
474 ctx->body =
475 serf_bucket_barrier_create(ctx->stream, bkt->allocator);
476 ctx->body = serf_bucket_limit_create(ctx->body, ctx->length,
477 bkt->allocator);
478 if (!ctx->type) {
479 ctx->state = STATE_HEADERS;
480 } else {
481 ctx->state = STATE_BODY;
482 }
483 } else {
484 ctx->state = STATE_DONE;
485 }
486 }
487 else {
488 /* The connection closed before we could get the next
489 * response. Treat the request as lost so that our upper
490 * end knows the server never tried to give us a response.
491 */
492 if (APR_STATUS_IS_EOF(status)) {
493 return SERF_ERROR_REQUEST_LOST;
494 }
495 }
496 break;
497 case STATE_HEADERS:
498 status = fetch_headers(ctx->body, ctx);
499 if (SERF_BUCKET_READ_ERROR(status))
500 return status;
501
502 /* If an empty line was read, then we hit the end of the headers.
503 * Move on to the body.
504 */
505 if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
506 /* Advance the state. */
507 ctx->state = STATE_DONE;
508 }
509 break;
510 case STATE_BODY:
511 /* Don't do anything. */
512 break;
513 case STATE_DONE:
514 return APR_EOF;
515 default:
516 /* Not reachable */
517 return APR_EGENERAL;
518 }
519
520 return status;
521 }
522
523 static apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx)
524 {
525 apr_status_t status;
526
527 /* Keep reading and moving through states if we aren't at the BODY */
528 while (ctx->state != STATE_BODY) {
529 status = run_machine(bkt, ctx);
530
531 /* Anything other than APR_SUCCESS means that we cannot immediately
532 * read again (for now).
533 */
534 if (status)
535 return status;
536 }
537 /* in STATE_BODY */
538
539 return APR_SUCCESS;
540 }
541
542 apr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers(
543 serf_bucket_t *bucket)
544 {
545 incoming_context_t *ctx = bucket->data;
546
547 return wait_for_body(bucket, ctx);
548 }
549
550 static apr_status_t bwtp_incoming_read(serf_bucket_t *bucket,
551 apr_size_t requested,
552 const char **data, apr_size_t *len)
553 {
554 incoming_context_t *ctx = bucket->data;
555 apr_status_t rv;
556
557 rv = wait_for_body(bucket, ctx);
558 if (rv) {
559 /* It's not possible to have read anything yet! */
560 if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) {
561 *len = 0;
562 }
563 return rv;
564 }
565
566 rv = serf_bucket_read(ctx->body, requested, data, len);
567 if (APR_STATUS_IS_EOF(rv)) {
568 ctx->state = STATE_DONE;
569 }
570 return rv;
571 }
572
573 static apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket,
574 int acceptable, int *found,
575 const char **data, apr_size_t *len)
576 {
577 incoming_context_t *ctx = bucket->data;
578 apr_status_t rv;
579
580 rv = wait_for_body(bucket, ctx);
581 if (rv) {
582 return rv;
583 }
584
585 /* Delegate to the stream bucket to do the readline. */
586 return serf_bucket_readline(ctx->body, acceptable, found, data, len);
587 }
588
589 /* ### need to implement */
590 #define bwtp_incoming_peek NULL
591
592 const serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = {
593 "BWTP-INCOMING",
594 bwtp_incoming_read,
595 bwtp_incoming_readline,
596 serf_default_read_iovec,
597 serf_default_read_for_sendfile,
598 serf_default_read_bucket,
599 bwtp_incoming_peek,
600 bwtp_incoming_destroy_and_data,
601 };

Properties

Name Value
svn:keywords MidnightBSD=%H