1 /*        $NetBSD: bufferevent_ratelim.c,v 1.6 2021/04/10 19:18:45 rillig Exp $ */
2 
3 /*
4  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
5  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. The name of the author may not be used to endorse or promote products
17  *    derived from this software without specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
20  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
21  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
22  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
23  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
24  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 #include "evconfig-private.h"
31 
32 #include <sys/types.h>
33 #include <limits.h>
34 #include <string.h>
35 #include <stdlib.h>
36 
37 #include "event2/event.h"
38 #include "event2/event_struct.h"
39 #include "event2/util.h"
40 #include "event2/bufferevent.h"
41 #include "event2/bufferevent_struct.h"
42 #include "event2/buffer.h"
43 
44 #include "ratelim-internal.h"
45 
46 #include "bufferevent-internal.h"
47 #include "mm-internal.h"
48 #include "util-internal.h"
49 #include "event-internal.h"
50 
51 int
ev_token_bucket_init_(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick,int reinitialize)52 ev_token_bucket_init_(struct ev_token_bucket *bucket,
53     const struct ev_token_bucket_cfg *cfg,
54     ev_uint32_t current_tick,
55     int reinitialize)
56 {
57           if (reinitialize) {
58                     /* on reinitialization, we only clip downwards, since we've
59                        already used who-knows-how-much bandwidth this tick.  We
60                        leave "last_updated" as it is; the next update will add the
61                        appropriate amount of bandwidth to the bucket.
62                     */
63                     if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
64                               bucket->read_limit = cfg->read_maximum;
65                     if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
66                               bucket->write_limit = cfg->write_maximum;
67           } else {
68                     bucket->read_limit = cfg->read_rate;
69                     bucket->write_limit = cfg->write_rate;
70                     bucket->last_updated = current_tick;
71           }
72           return 0;
73 }
74 
75 int
ev_token_bucket_update_(struct ev_token_bucket * bucket,const struct ev_token_bucket_cfg * cfg,ev_uint32_t current_tick)76 ev_token_bucket_update_(struct ev_token_bucket *bucket,
77     const struct ev_token_bucket_cfg *cfg,
78     ev_uint32_t current_tick)
79 {
80           /* It's okay if the tick number overflows, since we'll just
81            * wrap around when we do the unsigned substraction. */
82           unsigned n_ticks = current_tick - bucket->last_updated;
83 
84           /* Make sure some ticks actually happened, and that time didn't
85            * roll back. */
86           if (n_ticks == 0 || n_ticks > INT_MAX)
87                     return 0;
88 
89           /* Naively, we would say
90                     bucket->limit += n_ticks * cfg->rate;
91 
92                     if (bucket->limit > cfg->maximum)
93                               bucket->limit = cfg->maximum;
94 
95              But we're worried about overflow, so we do it like this:
96           */
97 
98           if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
99                     bucket->read_limit = cfg->read_maximum;
100           else
101                     bucket->read_limit += n_ticks * cfg->read_rate;
102 
103 
104           if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
105                     bucket->write_limit = cfg->write_maximum;
106           else
107                     bucket->write_limit += n_ticks * cfg->write_rate;
108 
109 
110           bucket->last_updated = current_tick;
111 
112           return 1;
113 }
114 
115 static inline void
bufferevent_update_buckets(struct bufferevent_private * bev)116 bufferevent_update_buckets(struct bufferevent_private *bev)
117 {
118           /* Must hold lock on bev. */
119           struct timeval now;
120           unsigned tick;
121           event_base_gettimeofday_cached(bev->bev.ev_base, &now);
122           tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
123           if (tick != bev->rate_limiting->limit.last_updated)
124                     ev_token_bucket_update_(&bev->rate_limiting->limit,
125                         bev->rate_limiting->cfg, tick);
126 }
127 
128 ev_uint32_t
ev_token_bucket_get_tick_(const struct timeval * tv,const struct ev_token_bucket_cfg * cfg)129 ev_token_bucket_get_tick_(const struct timeval *tv,
130     const struct ev_token_bucket_cfg *cfg)
131 {
132           /* This computation uses two multiplies and a divide.  We could do
133            * fewer if we knew that the tick length was an integer number of
134            * seconds, or if we knew it divided evenly into a second.  We should
135            * investigate that more.
136            */
137 
138           /* We cast to an ev_uint64_t first, since we don't want to overflow
139            * before we do the final divide. */
140           ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
141           return (unsigned)(msec / cfg->msec_per_tick);
142 }
143 
144 struct ev_token_bucket_cfg *
ev_token_bucket_cfg_new(size_t read_rate,size_t read_burst,size_t write_rate,size_t write_burst,const struct timeval * tick_len)145 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
146     size_t write_rate, size_t write_burst,
147     const struct timeval *tick_len)
148 {
149           struct ev_token_bucket_cfg *r;
150           struct timeval g;
151           if (! tick_len) {
152                     g.tv_sec = 1;
153                     g.tv_usec = 0;
154                     tick_len = &g;
155           }
156           if (read_rate > read_burst || write_rate > write_burst ||
157               read_rate < 1 || write_rate < 1)
158                     return NULL;
159           if (read_rate > EV_RATE_LIMIT_MAX ||
160               write_rate > EV_RATE_LIMIT_MAX ||
161               read_burst > EV_RATE_LIMIT_MAX ||
162               write_burst > EV_RATE_LIMIT_MAX)
163                     return NULL;
164           r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
165           if (!r)
166                     return NULL;
167           r->read_rate = read_rate;
168           r->write_rate = write_rate;
169           r->read_maximum = read_burst;
170           r->write_maximum = write_burst;
171           memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
172           r->msec_per_tick = (tick_len->tv_sec * 1000) +
173               (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
174           return r;
175 }
176 
177 void
ev_token_bucket_cfg_free(struct ev_token_bucket_cfg * cfg)178 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
179 {
180           mm_free(cfg);
181 }
182 
183 /* Default values for max_single_read & max_single_write variables. */
184 #define MAX_SINGLE_READ_DEFAULT 16384
185 #define MAX_SINGLE_WRITE_DEFAULT 16384
186 
187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
189 
190 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
191 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
192 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
193 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
194 
195 /** Helper: figure out the maximum amount we should write if is_write, or
196     the maximum amount we should read if is_read.  Return that maximum, or
197     0 if our bucket is wholly exhausted.
198  */
199 static inline ev_ssize_t
bufferevent_get_rlim_max_(struct bufferevent_private * bev,int is_write)200 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
201 {
202           /* needs lock on bev. */
203           ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
204 
205 #define LIM(x)                                                        \
206           (is_write ? (x).write_limit : (x).read_limit)
207 
208 #define GROUP_SUSPENDED(g)                        \
209           (is_write ? (g)->write_suspended : (g)->read_suspended)
210 
211           /* Sets max_so_far to MIN(x, max_so_far) */
212 #define CLAMPTO(x)                                \
213           do {                                              \
214                     if (max_so_far > (x))                   \
215                               max_so_far = (x);   \
216           } while (0);
217 
218           if (!bev->rate_limiting)
219                     return max_so_far;
220 
221           /* If rate-limiting is enabled at all, update the appropriate
222              bucket, and take the smaller of our rate limit and the group
223              rate limit.
224            */
225 
226           if (bev->rate_limiting->cfg) {
227                     bufferevent_update_buckets(bev);
228                     max_so_far = LIM(bev->rate_limiting->limit);
229           }
230           if (bev->rate_limiting->group) {
231                     struct bufferevent_rate_limit_group *g =
232                         bev->rate_limiting->group;
233                     ev_ssize_t share;
234                     LOCK_GROUP(g);
235                     if (GROUP_SUSPENDED(g)) {
236                               /* We can get here if we failed to lock this
237                                * particular bufferevent while suspending the whole
238                                * group. */
239                               if (is_write)
240                                         bufferevent_suspend_write_(&bev->bev,
241                                             BEV_SUSPEND_BW_GROUP);
242                               else
243                                         bufferevent_suspend_read_(&bev->bev,
244                                             BEV_SUSPEND_BW_GROUP);
245                               share = 0;
246                     } else {
247                               /* XXXX probably we should divide among the active
248                                * members, not the total members. */
249                               share = LIM(g->rate_limit) / g->n_members;
250                               if (share < g->min_share)
251                                         share = g->min_share;
252                     }
253                     UNLOCK_GROUP(g);
254                     CLAMPTO(share);
255           }
256 
257           if (max_so_far < 0)
258                     max_so_far = 0;
259           return max_so_far;
260 }
261 
262 ev_ssize_t
bufferevent_get_read_max_(struct bufferevent_private * bev)263 bufferevent_get_read_max_(struct bufferevent_private *bev)
264 {
265           return bufferevent_get_rlim_max_(bev, 0);
266 }
267 
268 ev_ssize_t
bufferevent_get_write_max_(struct bufferevent_private * bev)269 bufferevent_get_write_max_(struct bufferevent_private *bev)
270 {
271           return bufferevent_get_rlim_max_(bev, 1);
272 }
273 
274 int
bufferevent_decrement_read_buckets_(struct bufferevent_private * bev,ev_ssize_t bytes)275 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
276 {
277           /* XXXXX Make sure all users of this function check its return value */
278           int r = 0;
279           /* need to hold lock on bev */
280           if (!bev->rate_limiting)
281                     return 0;
282 
283           if (bev->rate_limiting->cfg) {
284                     bev->rate_limiting->limit.read_limit -= bytes;
285                     if (bev->rate_limiting->limit.read_limit <= 0) {
286                               bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
287                               if (event_add(&bev->rate_limiting->refill_bucket_event,
288                                         &bev->rate_limiting->cfg->tick_timeout) < 0)
289                                         r = -1;
290                     } else if (bev->read_suspended & BEV_SUSPEND_BW) {
291                               if (!(bev->write_suspended & BEV_SUSPEND_BW))
292                                         event_del(&bev->rate_limiting->refill_bucket_event);
293                               bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
294                     }
295           }
296 
297           if (bev->rate_limiting->group) {
298                     LOCK_GROUP(bev->rate_limiting->group);
299                     bev->rate_limiting->group->rate_limit.read_limit -= bytes;
300                     bev->rate_limiting->group->total_read += bytes;
301                     if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
302                               bev_group_suspend_reading_(bev->rate_limiting->group);
303                     } else if (bev->rate_limiting->group->read_suspended) {
304                               bev_group_unsuspend_reading_(bev->rate_limiting->group);
305                     }
306                     UNLOCK_GROUP(bev->rate_limiting->group);
307           }
308 
309           return r;
310 }
311 
312 int
bufferevent_decrement_write_buckets_(struct bufferevent_private * bev,ev_ssize_t bytes)313 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
314 {
315           /* XXXXX Make sure all users of this function check its return value */
316           int r = 0;
317           /* need to hold lock */
318           if (!bev->rate_limiting)
319                     return 0;
320 
321           if (bev->rate_limiting->cfg) {
322                     bev->rate_limiting->limit.write_limit -= bytes;
323                     if (bev->rate_limiting->limit.write_limit <= 0) {
324                               bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
325                               if (event_add(&bev->rate_limiting->refill_bucket_event,
326                                         &bev->rate_limiting->cfg->tick_timeout) < 0)
327                                         r = -1;
328                     } else if (bev->write_suspended & BEV_SUSPEND_BW) {
329                               if (!(bev->read_suspended & BEV_SUSPEND_BW))
330                                         event_del(&bev->rate_limiting->refill_bucket_event);
331                               bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
332                     }
333           }
334 
335           if (bev->rate_limiting->group) {
336                     LOCK_GROUP(bev->rate_limiting->group);
337                     bev->rate_limiting->group->rate_limit.write_limit -= bytes;
338                     bev->rate_limiting->group->total_written += bytes;
339                     if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
340                               bev_group_suspend_writing_(bev->rate_limiting->group);
341                     } else if (bev->rate_limiting->group->write_suspended) {
342                               bev_group_unsuspend_writing_(bev->rate_limiting->group);
343                     }
344                     UNLOCK_GROUP(bev->rate_limiting->group);
345           }
346 
347           return r;
348 }
349 
350 /** Stop reading on every bufferevent in <b>g</b> */
351 static int
bev_group_suspend_reading_(struct bufferevent_rate_limit_group * g)352 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
353 {
354           /* Needs group lock */
355           struct bufferevent_private *bev;
356           g->read_suspended = 1;
357           g->pending_unsuspend_read = 0;
358 
359           /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
360              to prevent a deadlock.  (Ordinarily, the group lock nests inside
361              the bufferevent locks.  If we are unable to lock any individual
362              bufferevent, it will find out later when it looks at its limit
363              and sees that its group is suspended.)
364           */
365           LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
366                     if (EVLOCK_TRY_LOCK_(bev->lock)) {
367                               bufferevent_suspend_read_(&bev->bev,
368                                   BEV_SUSPEND_BW_GROUP);
369                               EVLOCK_UNLOCK(bev->lock, 0);
370                     }
371           }
372           return 0;
373 }
374 
375 /** Stop writing on every bufferevent in <b>g</b> */
376 static int
bev_group_suspend_writing_(struct bufferevent_rate_limit_group * g)377 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
378 {
379           /* Needs group lock */
380           struct bufferevent_private *bev;
381           g->write_suspended = 1;
382           g->pending_unsuspend_write = 0;
383           LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
384                     if (EVLOCK_TRY_LOCK_(bev->lock)) {
385                               bufferevent_suspend_write_(&bev->bev,
386                                   BEV_SUSPEND_BW_GROUP);
387                               EVLOCK_UNLOCK(bev->lock, 0);
388                     }
389           }
390           return 0;
391 }
392 
393 /** Timer callback invoked on a single bufferevent with one or more exhausted
394     buckets when they are ready to refill. */
395 static void
bev_refill_callback_(evutil_socket_t fd,short what,void * arg)396 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
397 {
398           unsigned tick;
399           struct timeval now;
400           struct bufferevent_private *bev = arg;
401           int again = 0;
402           BEV_LOCK(&bev->bev);
403           if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
404                     BEV_UNLOCK(&bev->bev);
405                     return;
406           }
407 
408           /* First, update the bucket */
409           event_base_gettimeofday_cached(bev->bev.ev_base, &now);
410           tick = ev_token_bucket_get_tick_(&now,
411               bev->rate_limiting->cfg);
412           ev_token_bucket_update_(&bev->rate_limiting->limit,
413               bev->rate_limiting->cfg,
414               tick);
415 
416           /* Now unsuspend any read/write operations as appropriate. */
417           if ((bev->read_suspended & BEV_SUSPEND_BW)) {
418                     if (bev->rate_limiting->limit.read_limit > 0)
419                               bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
420                     else
421                               again = 1;
422           }
423           if ((bev->write_suspended & BEV_SUSPEND_BW)) {
424                     if (bev->rate_limiting->limit.write_limit > 0)
425                               bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
426                     else
427                               again = 1;
428           }
429           if (again) {
430                     /* One or more of the buckets may need another refill if they
431                        started negative.
432 
433                        XXXX if we need to be quiet for more ticks, we should
434                        maybe figure out what timeout we really want.
435                     */
436                     /* XXXX Handle event_add failure somehow */
437                     event_add(&bev->rate_limiting->refill_bucket_event,
438                         &bev->rate_limiting->cfg->tick_timeout);
439           }
440           BEV_UNLOCK(&bev->bev);
441 }
442 
443 /** Helper: grab a random element from a bufferevent group.
444  *
445  * Requires that we hold the lock on the group.
446  */
447 static struct bufferevent_private *
bev_group_random_element_(struct bufferevent_rate_limit_group * group)448 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
449 {
450           int which;
451           struct bufferevent_private *bev;
452 
453           /* requires group lock */
454 
455           if (!group->n_members)
456                     return NULL;
457 
458           EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
459 
460           which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
461 
462           bev = LIST_FIRST(&group->members);
463           while (which--)
464                     bev = LIST_NEXT(bev, rate_limiting->next_in_group);
465 
466           return bev;
467 }
468 
469 /** Iterate over the elements of a rate-limiting group 'g' with a random
470     starting point, assigning each to the variable 'bev', and executing the
471     block 'block'.
472 
473     We do this in a half-baked effort to get fairness among group members.
474     XXX Round-robin or some kind of priority queue would be even more fair.
475  */
476 #define FOREACH_RANDOM_ORDER(block)                         \
477           do {                                                        \
478                     first = bev_group_random_element_(g);   \
479                     for (bev = first; bev != LIST_END(&g->members); \
480                         bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
481                               block ;                                            \
482                     }                                                            \
483                     for (bev = LIST_FIRST(&g->members); bev && bev != first; \
484                         bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
485                               block ;                                                     \
486                     }                                                                     \
487           } while (0)
488 
489 static void
bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group * g)490 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
491 {
492           int again = 0;
493           struct bufferevent_private *bev, *first;
494 
495           g->read_suspended = 0;
496           FOREACH_RANDOM_ORDER({
497                     if (EVLOCK_TRY_LOCK_(bev->lock)) {
498                               bufferevent_unsuspend_read_(&bev->bev,
499                                   BEV_SUSPEND_BW_GROUP);
500                               EVLOCK_UNLOCK(bev->lock, 0);
501                     } else {
502                               again = 1;
503                     }
504           });
505           g->pending_unsuspend_read = again;
506 }
507 
508 static void
bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group * g)509 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
510 {
511           int again = 0;
512           struct bufferevent_private *bev, *first;
513           g->write_suspended = 0;
514 
515           FOREACH_RANDOM_ORDER({
516                     if (EVLOCK_TRY_LOCK_(bev->lock)) {
517                               bufferevent_unsuspend_write_(&bev->bev,
518                                   BEV_SUSPEND_BW_GROUP);
519                               EVLOCK_UNLOCK(bev->lock, 0);
520                     } else {
521                               again = 1;
522                     }
523           });
524           g->pending_unsuspend_write = again;
525 }
526 
527 /** Callback invoked every tick to add more elements to the group bucket
528     and unsuspend group members as needed.
529  */
530 static void
bev_group_refill_callback_(evutil_socket_t fd,short what,void * arg)531 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
532 {
533           struct bufferevent_rate_limit_group *g = arg;
534           unsigned tick;
535           struct timeval now;
536 
537           event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
538 
539           LOCK_GROUP(g);
540 
541           tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
542           ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
543 
544           if (g->pending_unsuspend_read ||
545               (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
546                     bev_group_unsuspend_reading_(g);
547           }
548           if (g->pending_unsuspend_write ||
549               (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
550                     bev_group_unsuspend_writing_(g);
551           }
552 
553           /* XXXX Rather than waiting to the next tick to unsuspend stuff
554            * with pending_unsuspend_write/read, we should do it on the
555            * next iteration of the mainloop.
556            */
557 
558           UNLOCK_GROUP(g);
559 }
560 
561 int
bufferevent_set_rate_limit(struct bufferevent * bev,struct ev_token_bucket_cfg * cfg)562 bufferevent_set_rate_limit(struct bufferevent *bev,
563     struct ev_token_bucket_cfg *cfg)
564 {
565           struct bufferevent_private *bevp = BEV_UPCAST(bev);
566           int r = -1;
567           struct bufferevent_rate_limit *rlim;
568           struct timeval now;
569           ev_uint32_t tick;
570           int reinit = 0, suspended = 0;
571           /* XXX reference-count cfg */
572 
573           BEV_LOCK(bev);
574 
575           if (cfg == NULL) {
576                     if (bevp->rate_limiting) {
577                               rlim = bevp->rate_limiting;
578                               rlim->cfg = NULL;
579                               bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
580                               bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
581                               if (event_initialized(&rlim->refill_bucket_event))
582                                         event_del(&rlim->refill_bucket_event);
583                     }
584                     r = 0;
585                     goto done;
586           }
587 
588           event_base_gettimeofday_cached(bev->ev_base, &now);
589           tick = ev_token_bucket_get_tick_(&now, cfg);
590 
591           if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
592                     /* no-op */
593                     r = 0;
594                     goto done;
595           }
596           if (bevp->rate_limiting == NULL) {
597                     rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
598                     if (!rlim)
599                               goto done;
600                     bevp->rate_limiting = rlim;
601           } else {
602                     rlim = bevp->rate_limiting;
603           }
604           reinit = rlim->cfg != NULL;
605 
606           rlim->cfg = cfg;
607           ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
608 
609           if (reinit) {
610                     EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
611                     event_del(&rlim->refill_bucket_event);
612           }
613           event_assign(&rlim->refill_bucket_event, bev->ev_base,
614               -1, EV_FINALIZE, bev_refill_callback_, bevp);
615 
616           if (rlim->limit.read_limit > 0) {
617                     bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
618           } else {
619                     bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
620                     suspended=1;
621           }
622           if (rlim->limit.write_limit > 0) {
623                     bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
624           } else {
625                     bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
626                     suspended = 1;
627           }
628 
629           if (suspended)
630                     event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
631 
632           r = 0;
633 
634 done:
635           BEV_UNLOCK(bev);
636           return r;
637 }
638 
639 struct bufferevent_rate_limit_group *
bufferevent_rate_limit_group_new(struct event_base * base,const struct ev_token_bucket_cfg * cfg)640 bufferevent_rate_limit_group_new(struct event_base *base,
641     const struct ev_token_bucket_cfg *cfg)
642 {
643           struct bufferevent_rate_limit_group *g;
644           struct timeval now;
645           ev_uint32_t tick;
646 
647           event_base_gettimeofday_cached(base, &now);
648           tick = ev_token_bucket_get_tick_(&now, cfg);
649 
650           g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
651           if (!g)
652                     return NULL;
653           memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
654           LIST_INIT(&g->members);
655 
656           ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
657 
658           event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
659               bev_group_refill_callback_, g);
660           /*XXXX handle event_add failure */
661           event_add(&g->master_refill_event, &cfg->tick_timeout);
662 
663           EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
664 
665           bufferevent_rate_limit_group_set_min_share(g, 64);
666 
667           evutil_weakrand_seed_(&g->weakrand_seed,
668               (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
669 
670           return g;
671 }
672 
673 int
bufferevent_rate_limit_group_set_cfg(struct bufferevent_rate_limit_group * g,const struct ev_token_bucket_cfg * cfg)674 bufferevent_rate_limit_group_set_cfg(
675           struct bufferevent_rate_limit_group *g,
676           const struct ev_token_bucket_cfg *cfg)
677 {
678           int same_tick;
679           if (!g || !cfg)
680                     return -1;
681 
682           LOCK_GROUP(g);
683           same_tick = evutil_timercmp(
684                     &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
685           memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
686 
687           if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
688                     g->rate_limit.read_limit = cfg->read_maximum;
689           if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
690                     g->rate_limit.write_limit = cfg->write_maximum;
691 
692           if (!same_tick) {
693                     /* This can cause a hiccup in the schedule */
694                     event_add(&g->master_refill_event, &cfg->tick_timeout);
695           }
696 
697           /* The new limits might force us to adjust min_share differently. */
698           bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
699 
700           UNLOCK_GROUP(g);
701           return 0;
702 }
703 
704 int
bufferevent_rate_limit_group_set_min_share(struct bufferevent_rate_limit_group * g,size_t share)705 bufferevent_rate_limit_group_set_min_share(
706           struct bufferevent_rate_limit_group *g,
707           size_t share)
708 {
709           if (share > EV_SSIZE_MAX)
710                     return -1;
711 
712           g->configured_min_share = share;
713 
714           /* Can't set share to less than the one-tick maximum.  IOW, at steady
715            * state, at least one connection can go per tick. */
716           if (share > g->rate_limit_cfg.read_rate)
717                     share = g->rate_limit_cfg.read_rate;
718           if (share > g->rate_limit_cfg.write_rate)
719                     share = g->rate_limit_cfg.write_rate;
720 
721           g->min_share = share;
722           return 0;
723 }
724 
725 void
bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group * g)726 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
727 {
728           LOCK_GROUP(g);
729           EVUTIL_ASSERT(0 == g->n_members);
730           event_del(&g->master_refill_event);
731           UNLOCK_GROUP(g);
732           EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
733           mm_free(g);
734 }
735 
736 int
bufferevent_add_to_rate_limit_group(struct bufferevent * bev,struct bufferevent_rate_limit_group * g)737 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
738     struct bufferevent_rate_limit_group *g)
739 {
740           int wsuspend, rsuspend;
741           struct bufferevent_private *bevp = BEV_UPCAST(bev);
742           BEV_LOCK(bev);
743 
744           if (!bevp->rate_limiting) {
745                     struct bufferevent_rate_limit *rlim;
746                     rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
747                     if (!rlim) {
748                               BEV_UNLOCK(bev);
749                               return -1;
750                     }
751                     event_assign(&rlim->refill_bucket_event, bev->ev_base,
752                         -1, EV_FINALIZE, bev_refill_callback_, bevp);
753                     bevp->rate_limiting = rlim;
754           }
755 
756           if (bevp->rate_limiting->group == g) {
757                     BEV_UNLOCK(bev);
758                     return 0;
759           }
760           if (bevp->rate_limiting->group)
761                     bufferevent_remove_from_rate_limit_group(bev);
762 
763           LOCK_GROUP(g);
764           bevp->rate_limiting->group = g;
765           ++g->n_members;
766           LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
767 
768           rsuspend = g->read_suspended;
769           wsuspend = g->write_suspended;
770 
771           UNLOCK_GROUP(g);
772 
773           if (rsuspend)
774                     bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
775           if (wsuspend)
776                     bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
777 
778           BEV_UNLOCK(bev);
779           return 0;
780 }
781 
782 int
bufferevent_remove_from_rate_limit_group(struct bufferevent * bev)783 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
784 {
785           return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
786 }
787 
788 int
bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent * bev,int unsuspend)789 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
790     int unsuspend)
791 {
792           struct bufferevent_private *bevp = BEV_UPCAST(bev);
793           BEV_LOCK(bev);
794           if (bevp->rate_limiting && bevp->rate_limiting->group) {
795                     struct bufferevent_rate_limit_group *g =
796                         bevp->rate_limiting->group;
797                     LOCK_GROUP(g);
798                     bevp->rate_limiting->group = NULL;
799                     --g->n_members;
800                     LIST_REMOVE(bevp, rate_limiting->next_in_group);
801                     UNLOCK_GROUP(g);
802           }
803           if (unsuspend) {
804                     bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
805                     bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
806           }
807           BEV_UNLOCK(bev);
808           return 0;
809 }
810 
811 /* ===
812  * API functions to expose rate limits.
813  *
814  * Don't use these from inside Libevent; they're meant to be for use by
815  * the program.
816  * === */
817 
818 /* Mostly you don't want to use this function from inside libevent;
819  * bufferevent_get_read_max_() is more likely what you want*/
820 ev_ssize_t
bufferevent_get_read_limit(struct bufferevent * bev)821 bufferevent_get_read_limit(struct bufferevent *bev)
822 {
823           ev_ssize_t r;
824           struct bufferevent_private *bevp;
825           BEV_LOCK(bev);
826           bevp = BEV_UPCAST(bev);
827           if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
828                     bufferevent_update_buckets(bevp);
829                     r = bevp->rate_limiting->limit.read_limit;
830           } else {
831                     r = EV_SSIZE_MAX;
832           }
833           BEV_UNLOCK(bev);
834           return r;
835 }
836 
837 /* Mostly you don't want to use this function from inside libevent;
838  * bufferevent_get_write_max_() is more likely what you want*/
839 ev_ssize_t
bufferevent_get_write_limit(struct bufferevent * bev)840 bufferevent_get_write_limit(struct bufferevent *bev)
841 {
842           ev_ssize_t r;
843           struct bufferevent_private *bevp;
844           BEV_LOCK(bev);
845           bevp = BEV_UPCAST(bev);
846           if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
847                     bufferevent_update_buckets(bevp);
848                     r = bevp->rate_limiting->limit.write_limit;
849           } else {
850                     r = EV_SSIZE_MAX;
851           }
852           BEV_UNLOCK(bev);
853           return r;
854 }
855 
856 int
bufferevent_set_max_single_read(struct bufferevent * bev,size_t size)857 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
858 {
859           struct bufferevent_private *bevp;
860           BEV_LOCK(bev);
861           bevp = BEV_UPCAST(bev);
862           if (size == 0 || size > EV_SSIZE_MAX)
863                     bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
864           else
865                     bevp->max_single_read = size;
866           BEV_UNLOCK(bev);
867           return 0;
868 }
869 
870 int
bufferevent_set_max_single_write(struct bufferevent * bev,size_t size)871 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
872 {
873           struct bufferevent_private *bevp;
874           BEV_LOCK(bev);
875           bevp = BEV_UPCAST(bev);
876           if (size == 0 || size > EV_SSIZE_MAX)
877                     bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
878           else
879                     bevp->max_single_write = size;
880           BEV_UNLOCK(bev);
881           return 0;
882 }
883 
884 ev_ssize_t
bufferevent_get_max_single_read(struct bufferevent * bev)885 bufferevent_get_max_single_read(struct bufferevent *bev)
886 {
887           ev_ssize_t r;
888 
889           BEV_LOCK(bev);
890           r = BEV_UPCAST(bev)->max_single_read;
891           BEV_UNLOCK(bev);
892           return r;
893 }
894 
895 ev_ssize_t
bufferevent_get_max_single_write(struct bufferevent * bev)896 bufferevent_get_max_single_write(struct bufferevent *bev)
897 {
898           ev_ssize_t r;
899 
900           BEV_LOCK(bev);
901           r = BEV_UPCAST(bev)->max_single_write;
902           BEV_UNLOCK(bev);
903           return r;
904 }
905 
906 ev_ssize_t
bufferevent_get_max_to_read(struct bufferevent * bev)907 bufferevent_get_max_to_read(struct bufferevent *bev)
908 {
909           ev_ssize_t r;
910           BEV_LOCK(bev);
911           r = bufferevent_get_read_max_(BEV_UPCAST(bev));
912           BEV_UNLOCK(bev);
913           return r;
914 }
915 
916 ev_ssize_t
bufferevent_get_max_to_write(struct bufferevent * bev)917 bufferevent_get_max_to_write(struct bufferevent *bev)
918 {
919           ev_ssize_t r;
920           BEV_LOCK(bev);
921           r = bufferevent_get_write_max_(BEV_UPCAST(bev));
922           BEV_UNLOCK(bev);
923           return r;
924 }
925 
926 const struct ev_token_bucket_cfg *
bufferevent_get_token_bucket_cfg(const struct bufferevent * bev)927 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
928           struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
929           struct ev_token_bucket_cfg *cfg;
930 
931           BEV_LOCK(bev);
932 
933           if (bufev_private->rate_limiting) {
934                     cfg = bufev_private->rate_limiting->cfg;
935           } else {
936                     cfg = NULL;
937           }
938 
939           BEV_UNLOCK(bev);
940 
941           return cfg;
942 }
943 
944 /* Mostly you don't want to use this function from inside libevent;
945  * bufferevent_get_read_max_() is more likely what you want*/
946 ev_ssize_t
bufferevent_rate_limit_group_get_read_limit(struct bufferevent_rate_limit_group * grp)947 bufferevent_rate_limit_group_get_read_limit(
948           struct bufferevent_rate_limit_group *grp)
949 {
950           ev_ssize_t r;
951           LOCK_GROUP(grp);
952           r = grp->rate_limit.read_limit;
953           UNLOCK_GROUP(grp);
954           return r;
955 }
956 
957 /* Mostly you don't want to use this function from inside libevent;
958  * bufferevent_get_write_max_() is more likely what you want. */
959 ev_ssize_t
bufferevent_rate_limit_group_get_write_limit(struct bufferevent_rate_limit_group * grp)960 bufferevent_rate_limit_group_get_write_limit(
961           struct bufferevent_rate_limit_group *grp)
962 {
963           ev_ssize_t r;
964           LOCK_GROUP(grp);
965           r = grp->rate_limit.write_limit;
966           UNLOCK_GROUP(grp);
967           return r;
968 }
969 
970 int
bufferevent_decrement_read_limit(struct bufferevent * bev,ev_ssize_t decr)971 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
972 {
973           int r = 0;
974           ev_ssize_t old_limit, new_limit;
975           struct bufferevent_private *bevp;
976           BEV_LOCK(bev);
977           bevp = BEV_UPCAST(bev);
978           EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
979           old_limit = bevp->rate_limiting->limit.read_limit;
980 
981           new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
982           if (old_limit > 0 && new_limit <= 0) {
983                     bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
984                     if (event_add(&bevp->rate_limiting->refill_bucket_event,
985                               &bevp->rate_limiting->cfg->tick_timeout) < 0)
986                               r = -1;
987           } else if (old_limit <= 0 && new_limit > 0) {
988                     if (!(bevp->write_suspended & BEV_SUSPEND_BW))
989                               event_del(&bevp->rate_limiting->refill_bucket_event);
990                     bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
991           }
992 
993           BEV_UNLOCK(bev);
994           return r;
995 }
996 
997 int
bufferevent_decrement_write_limit(struct bufferevent * bev,ev_ssize_t decr)998 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
999 {
1000           /* XXXX this is mostly copy-and-paste from
1001            * bufferevent_decrement_read_limit */
1002           int r = 0;
1003           ev_ssize_t old_limit, new_limit;
1004           struct bufferevent_private *bevp;
1005           BEV_LOCK(bev);
1006           bevp = BEV_UPCAST(bev);
1007           EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1008           old_limit = bevp->rate_limiting->limit.write_limit;
1009 
1010           new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1011           if (old_limit > 0 && new_limit <= 0) {
1012                     bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1013                     if (event_add(&bevp->rate_limiting->refill_bucket_event,
1014                               &bevp->rate_limiting->cfg->tick_timeout) < 0)
1015                               r = -1;
1016           } else if (old_limit <= 0 && new_limit > 0) {
1017                     if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1018                               event_del(&bevp->rate_limiting->refill_bucket_event);
1019                     bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1020           }
1021 
1022           BEV_UNLOCK(bev);
1023           return r;
1024 }
1025 
1026 int
bufferevent_rate_limit_group_decrement_read(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)1027 bufferevent_rate_limit_group_decrement_read(
1028           struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1029 {
1030           int r = 0;
1031           ev_ssize_t old_limit, new_limit;
1032           LOCK_GROUP(grp);
1033           old_limit = grp->rate_limit.read_limit;
1034           new_limit = (grp->rate_limit.read_limit -= decr);
1035 
1036           if (old_limit > 0 && new_limit <= 0) {
1037                     bev_group_suspend_reading_(grp);
1038           } else if (old_limit <= 0 && new_limit > 0) {
1039                     bev_group_unsuspend_reading_(grp);
1040           }
1041 
1042           UNLOCK_GROUP(grp);
1043           return r;
1044 }
1045 
1046 int
bufferevent_rate_limit_group_decrement_write(struct bufferevent_rate_limit_group * grp,ev_ssize_t decr)1047 bufferevent_rate_limit_group_decrement_write(
1048           struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1049 {
1050           int r = 0;
1051           ev_ssize_t old_limit, new_limit;
1052           LOCK_GROUP(grp);
1053           old_limit = grp->rate_limit.write_limit;
1054           new_limit = (grp->rate_limit.write_limit -= decr);
1055 
1056           if (old_limit > 0 && new_limit <= 0) {
1057                     bev_group_suspend_writing_(grp);
1058           } else if (old_limit <= 0 && new_limit > 0) {
1059                     bev_group_unsuspend_writing_(grp);
1060           }
1061 
1062           UNLOCK_GROUP(grp);
1063           return r;
1064 }
1065 
1066 void
bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group * grp,ev_uint64_t * total_read_out,ev_uint64_t * total_written_out)1067 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1068     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1069 {
1070           EVUTIL_ASSERT(grp != NULL);
1071           if (total_read_out)
1072                     *total_read_out = grp->total_read;
1073           if (total_written_out)
1074                     *total_written_out = grp->total_written;
1075 }
1076 
1077 void
bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group * grp)1078 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1079 {
1080           grp->total_read = grp->total_written = 0;
1081 }
1082 
1083 int
bufferevent_ratelim_init_(struct bufferevent_private * bev)1084 bufferevent_ratelim_init_(struct bufferevent_private *bev)
1085 {
1086           bev->rate_limiting = NULL;
1087           bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1088           bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1089 
1090           return 0;
1091 }
1092