1use super::super::error;
2use crate::discover::{Change, Discover};
3use crate::load::Load;
4use crate::ready_cache::{error::Failed, ReadyCache};
5use crate::util::rng::{sample_floyd2, HasherRng, Rng};
6use futures_core::ready;
7use futures_util::future::{self, TryFutureExt};
8use std::hash::Hash;
9use std::marker::PhantomData;
10use std::{
11 fmt,
12 pin::Pin,
13 task::{Context, Poll},
14};
15use tower_service::Service;
16use tracing::{debug, trace};
17
18pub struct Balance<D, Req>
30where
31 D: Discover,
32 D::Key: Hash,
33{
34 discover: D,
35
36 services: ReadyCache<D::Key, D::Service, Req>,
37 ready_index: Option<usize>,
38
39 rng: Box<dyn Rng + Send + Sync>,
40
41 _req: PhantomData<Req>,
42}
43
44impl<D: Discover, Req> fmt::Debug for Balance<D, Req>
45where
46 D: fmt::Debug,
47 D::Key: Hash + fmt::Debug,
48 D::Service: fmt::Debug,
49{
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 f.debug_struct("Balance")
52 .field("discover", &self.discover)
53 .field("services", &self.services)
54 .finish()
55 }
56}
57
58impl<D, Req> Balance<D, Req>
59where
60 D: Discover,
61 D::Key: Hash,
62 D::Service: Service<Req>,
63 <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
64{
65 pub fn new(discover: D) -> Self {
67 Self::from_rng(discover, HasherRng::default())
68 }
69
70 pub fn from_rng<R: Rng + Send + Sync + 'static>(discover: D, rng: R) -> Self {
72 let rng = Box::new(rng);
73 Self {
74 rng,
75 discover,
76 services: ReadyCache::default(),
77 ready_index: None,
78
79 _req: PhantomData,
80 }
81 }
82
83 pub fn len(&self) -> usize {
85 self.services.len()
86 }
87
88 pub fn is_empty(&self) -> bool {
90 self.services.is_empty()
91 }
92}
93
94impl<D, Req> Balance<D, Req>
95where
96 D: Discover + Unpin,
97 D::Key: Hash + Clone,
98 D::Error: Into<crate::BoxError>,
99 D::Service: Service<Req> + Load,
100 <D::Service as Load>::Metric: std::fmt::Debug,
101 <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
102{
103 fn update_pending_from_discover(
107 &mut self,
108 cx: &mut Context<'_>,
109 ) -> Poll<Option<Result<(), error::Discover>>> {
110 debug!("updating from discover");
111 loop {
112 match ready!(Pin::new(&mut self.discover).poll_discover(cx))
113 .transpose()
114 .map_err(|e| error::Discover(e.into()))?
115 {
116 None => return Poll::Ready(None),
117 Some(Change::Remove(key)) => {
118 trace!("remove");
119 self.services.evict(&key);
120 }
121 Some(Change::Insert(key, svc)) => {
122 trace!("insert");
123 self.services.push(key, svc);
126 }
127 }
128 }
129 }
130
131 fn promote_pending_to_ready(&mut self, cx: &mut Context<'_>) {
132 loop {
133 match self.services.poll_pending(cx) {
134 Poll::Ready(Ok(())) => {
135 debug_assert_eq!(self.services.pending_len(), 0);
137 break;
138 }
139 Poll::Pending => {
140 debug_assert!(self.services.pending_len() > 0);
142 break;
143 }
144 Poll::Ready(Err(error)) => {
145 debug!(%error, "dropping failed endpoint");
148 }
149 }
150 }
151 trace!(
152 ready = %self.services.ready_len(),
153 pending = %self.services.pending_len(),
154 "poll_unready"
155 );
156 }
157
158 fn p2c_ready_index(&mut self) -> Option<usize> {
160 match self.services.ready_len() {
161 0 => None,
162 1 => Some(0),
163 len => {
164 let [aidx, bidx] = sample_floyd2(&mut self.rng, len as u64);
167 debug_assert_ne!(aidx, bidx, "random indices must be distinct");
168
169 let aload = self.ready_index_load(aidx as usize);
170 let bload = self.ready_index_load(bidx as usize);
171 let chosen = if aload <= bload { aidx } else { bidx };
172
173 trace!(
174 a.index = aidx,
175 a.load = ?aload,
176 b.index = bidx,
177 b.load = ?bload,
178 chosen = if chosen == aidx { "a" } else { "b" },
179 "p2c",
180 );
181 Some(chosen as usize)
182 }
183 }
184 }
185
186 fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric {
188 let (_, svc) = self.services.get_ready_index(index).expect("invalid index");
189 svc.load()
190 }
191}
192
193impl<D, Req> Service<Req> for Balance<D, Req>
194where
195 D: Discover + Unpin,
196 D::Key: Hash + Clone,
197 D::Error: Into<crate::BoxError>,
198 D::Service: Service<Req> + Load,
199 <D::Service as Load>::Metric: std::fmt::Debug,
200 <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
201{
202 type Response = <D::Service as Service<Req>>::Response;
203 type Error = crate::BoxError;
204 type Future = future::MapErr<
205 <D::Service as Service<Req>>::Future,
206 fn(<D::Service as Service<Req>>::Error) -> crate::BoxError,
207 >;
208
209 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
210 let _ = self.update_pending_from_discover(cx)?;
213 self.promote_pending_to_ready(cx);
214
215 loop {
216 if let Some(index) = self.ready_index.take() {
223 match self.services.check_ready_index(cx, index) {
224 Ok(true) => {
225 self.ready_index = Some(index);
227 return Poll::Ready(Ok(()));
228 }
229 Ok(false) => {
230 trace!("ready service became unavailable");
232 }
233 Err(Failed(_, error)) => {
234 debug!(%error, "endpoint failed");
237 }
238 }
239 }
240
241 self.ready_index = self.p2c_ready_index();
244 if self.ready_index.is_none() {
245 debug_assert_eq!(self.services.ready_len(), 0);
246 return Poll::Pending;
249 }
250 }
251 }
252
253 fn call(&mut self, request: Req) -> Self::Future {
254 let index = self.ready_index.take().expect("called before ready");
255 self.services
256 .call_ready_index(index, request)
257 .map_err(Into::into)
258 }
259}