tower/limit/rate/service.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
use super::Rate;
use futures_core::ready;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::time::{Instant, Sleep};
use tower_service::Service;
/// Enforces a rate limit on the number of requests the underlying
/// service can handle over a period of time.
#[derive(Debug)]
pub struct RateLimit<T> {
inner: T,
rate: Rate,
state: State,
sleep: Pin<Box<Sleep>>,
}
#[derive(Debug)]
enum State {
// The service has hit its limit
Limited,
Ready { until: Instant, rem: u64 },
}
impl<T> RateLimit<T> {
/// Create a new rate limiter
pub fn new(inner: T, rate: Rate) -> Self {
let until = Instant::now();
let state = State::Ready {
until,
rem: rate.num(),
};
RateLimit {
inner,
rate,
state,
// The sleep won't actually be used with this duration, but
// we create it eagerly so that we can reset it in place rather than
// `Box::pin`ning a new `Sleep` every time we need one.
sleep: Box::pin(tokio::time::sleep_until(until)),
}
}
/// Get a reference to the inner service
pub fn get_ref(&self) -> &T {
&self.inner
}
/// Get a mutable reference to the inner service
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
/// Consume `self`, returning the inner service
pub fn into_inner(self) -> T {
self.inner
}
}
impl<S, Request> Service<Request> for RateLimit<S>
where
S: Service<Request>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.state {
State::Ready { .. } => return Poll::Ready(ready!(self.inner.poll_ready(cx))),
State::Limited => {
if Pin::new(&mut self.sleep).poll(cx).is_pending() {
tracing::trace!("rate limit exceeded; sleeping.");
return Poll::Pending;
}
}
}
self.state = State::Ready {
until: Instant::now() + self.rate.per(),
rem: self.rate.num(),
};
Poll::Ready(ready!(self.inner.poll_ready(cx)))
}
fn call(&mut self, request: Request) -> Self::Future {
match self.state {
State::Ready { mut until, mut rem } => {
let now = Instant::now();
// If the period has elapsed, reset it.
if now >= until {
until = now + self.rate.per();
rem = self.rate.num();
}
if rem > 1 {
rem -= 1;
self.state = State::Ready { until, rem };
} else {
// The service is disabled until further notice
// Reset the sleep future in place, so that we don't have to
// deallocate the existing box and allocate a new one.
self.sleep.as_mut().reset(until);
self.state = State::Limited;
}
// Call the inner future
self.inner.call(request)
}
State::Limited => panic!("service not ready; poll_ready must be called first"),
}
}
}
#[cfg(feature = "load")]
impl<S> crate::load::Load for RateLimit<S>
where
S: crate::load::Load,
{
type Metric = S::Metric;
fn load(&self) -> Self::Metric {
self.inner.load()
}
}