tower/balance/p2c/
service.rs

1use super::super::error;
2use crate::discover::{Change, Discover};
3use crate::load::Load;
4use crate::ready_cache::{error::Failed, ReadyCache};
5use crate::util::rng::{sample_floyd2, HasherRng, Rng};
6use futures_core::ready;
7use futures_util::future::{self, TryFutureExt};
8use std::hash::Hash;
9use std::marker::PhantomData;
10use std::{
11    fmt,
12    pin::Pin,
13    task::{Context, Poll},
14};
15use tower_service::Service;
16use tracing::{debug, trace};
17
18/// Efficiently distributes requests across an arbitrary number of services.
19///
20/// See the [module-level documentation](..) for details.
21///
22/// Note that [`Balance`] requires that the [`Discover`] you use is [`Unpin`] in order to implement
23/// [`Service`]. This is because it needs to be accessed from [`Service::poll_ready`], which takes
24/// `&mut self`. You can achieve this easily by wrapping your [`Discover`] in [`Box::pin`] before you
25/// construct the [`Balance`] instance. For more details, see [#319].
26///
27/// [`Box::pin`]: std::boxed::Box::pin()
28/// [#319]: https://github.com/tower-rs/tower/issues/319
29pub struct Balance<D, Req>
30where
31    D: Discover,
32    D::Key: Hash,
33{
34    discover: D,
35
36    services: ReadyCache<D::Key, D::Service, Req>,
37    ready_index: Option<usize>,
38
39    rng: Box<dyn Rng + Send + Sync>,
40
41    _req: PhantomData<Req>,
42}
43
44impl<D: Discover, Req> fmt::Debug for Balance<D, Req>
45where
46    D: fmt::Debug,
47    D::Key: Hash + fmt::Debug,
48    D::Service: fmt::Debug,
49{
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        f.debug_struct("Balance")
52            .field("discover", &self.discover)
53            .field("services", &self.services)
54            .finish()
55    }
56}
57
58impl<D, Req> Balance<D, Req>
59where
60    D: Discover,
61    D::Key: Hash,
62    D::Service: Service<Req>,
63    <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
64{
65    /// Constructs a load balancer that uses operating system entropy.
66    pub fn new(discover: D) -> Self {
67        Self::from_rng(discover, HasherRng::default())
68    }
69
70    /// Constructs a load balancer seeded with the provided random number generator.
71    pub fn from_rng<R: Rng + Send + Sync + 'static>(discover: D, rng: R) -> Self {
72        let rng = Box::new(rng);
73        Self {
74            rng,
75            discover,
76            services: ReadyCache::default(),
77            ready_index: None,
78
79            _req: PhantomData,
80        }
81    }
82
83    /// Returns the number of endpoints currently tracked by the balancer.
84    pub fn len(&self) -> usize {
85        self.services.len()
86    }
87
88    /// Returns whether or not the balancer is empty.
89    pub fn is_empty(&self) -> bool {
90        self.services.is_empty()
91    }
92}
93
94impl<D, Req> Balance<D, Req>
95where
96    D: Discover + Unpin,
97    D::Key: Hash + Clone,
98    D::Error: Into<crate::BoxError>,
99    D::Service: Service<Req> + Load,
100    <D::Service as Load>::Metric: std::fmt::Debug,
101    <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
102{
103    /// Polls `discover` for updates, adding new items to `not_ready`.
104    ///
105    /// Removals may alter the order of either `ready` or `not_ready`.
106    fn update_pending_from_discover(
107        &mut self,
108        cx: &mut Context<'_>,
109    ) -> Poll<Option<Result<(), error::Discover>>> {
110        debug!("updating from discover");
111        loop {
112            match ready!(Pin::new(&mut self.discover).poll_discover(cx))
113                .transpose()
114                .map_err(|e| error::Discover(e.into()))?
115            {
116                None => return Poll::Ready(None),
117                Some(Change::Remove(key)) => {
118                    trace!("remove");
119                    self.services.evict(&key);
120                }
121                Some(Change::Insert(key, svc)) => {
122                    trace!("insert");
123                    // If this service already existed in the set, it will be
124                    // replaced as the new one becomes ready.
125                    self.services.push(key, svc);
126                }
127            }
128        }
129    }
130
131    fn promote_pending_to_ready(&mut self, cx: &mut Context<'_>) {
132        loop {
133            match self.services.poll_pending(cx) {
134                Poll::Ready(Ok(())) => {
135                    // There are no remaining pending services.
136                    debug_assert_eq!(self.services.pending_len(), 0);
137                    break;
138                }
139                Poll::Pending => {
140                    // None of the pending services are ready.
141                    debug_assert!(self.services.pending_len() > 0);
142                    break;
143                }
144                Poll::Ready(Err(error)) => {
145                    // An individual service was lost; continue processing
146                    // pending services.
147                    debug!(%error, "dropping failed endpoint");
148                }
149            }
150        }
151        trace!(
152            ready = %self.services.ready_len(),
153            pending = %self.services.pending_len(),
154            "poll_unready"
155        );
156    }
157
158    /// Performs P2C on inner services to find a suitable endpoint.
159    fn p2c_ready_index(&mut self) -> Option<usize> {
160        match self.services.ready_len() {
161            0 => None,
162            1 => Some(0),
163            len => {
164                // Get two distinct random indexes (in a random order) and
165                // compare the loads of the service at each index.
166                let [aidx, bidx] = sample_floyd2(&mut self.rng, len as u64);
167                debug_assert_ne!(aidx, bidx, "random indices must be distinct");
168
169                let aload = self.ready_index_load(aidx as usize);
170                let bload = self.ready_index_load(bidx as usize);
171                let chosen = if aload <= bload { aidx } else { bidx };
172
173                trace!(
174                    a.index = aidx,
175                    a.load = ?aload,
176                    b.index = bidx,
177                    b.load = ?bload,
178                    chosen = if chosen == aidx { "a" } else { "b" },
179                    "p2c",
180                );
181                Some(chosen as usize)
182            }
183        }
184    }
185
186    /// Accesses a ready endpoint by index and returns its current load.
187    fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric {
188        let (_, svc) = self.services.get_ready_index(index).expect("invalid index");
189        svc.load()
190    }
191}
192
193impl<D, Req> Service<Req> for Balance<D, Req>
194where
195    D: Discover + Unpin,
196    D::Key: Hash + Clone,
197    D::Error: Into<crate::BoxError>,
198    D::Service: Service<Req> + Load,
199    <D::Service as Load>::Metric: std::fmt::Debug,
200    <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
201{
202    type Response = <D::Service as Service<Req>>::Response;
203    type Error = crate::BoxError;
204    type Future = future::MapErr<
205        <D::Service as Service<Req>>::Future,
206        fn(<D::Service as Service<Req>>::Error) -> crate::BoxError,
207    >;
208
209    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
210        // `ready_index` may have already been set by a prior invocation. These
211        // updates cannot disturb the order of existing ready services.
212        let _ = self.update_pending_from_discover(cx)?;
213        self.promote_pending_to_ready(cx);
214
215        loop {
216            // If a service has already been selected, ensure that it is ready.
217            // This ensures that the underlying service is ready immediately
218            // before a request is dispatched to it (i.e. in the same task
219            // invocation). If, e.g., a failure detector has changed the state
220            // of the service, it may be evicted from the ready set so that
221            // another service can be selected.
222            if let Some(index) = self.ready_index.take() {
223                match self.services.check_ready_index(cx, index) {
224                    Ok(true) => {
225                        // The service remains ready.
226                        self.ready_index = Some(index);
227                        return Poll::Ready(Ok(()));
228                    }
229                    Ok(false) => {
230                        // The service is no longer ready. Try to find a new one.
231                        trace!("ready service became unavailable");
232                    }
233                    Err(Failed(_, error)) => {
234                        // The ready endpoint failed, so log the error and try
235                        // to find a new one.
236                        debug!(%error, "endpoint failed");
237                    }
238                }
239            }
240
241            // Select a new service by comparing two at random and using the
242            // lesser-loaded service.
243            self.ready_index = self.p2c_ready_index();
244            if self.ready_index.is_none() {
245                debug_assert_eq!(self.services.ready_len(), 0);
246                // We have previously registered interest in updates from
247                // discover and pending services.
248                return Poll::Pending;
249            }
250        }
251    }
252
253    fn call(&mut self, request: Req) -> Self::Future {
254        let index = self.ready_index.take().expect("called before ready");
255        self.services
256            .call_ready_index(index, request)
257            .map_err(Into::into)
258    }
259}