hyper_util/client/legacy/
pool.rs

1#![allow(dead_code)]
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::convert::Infallible;
5use std::error::Error as StdError;
6use std::fmt::{self, Debug};
7use std::future::Future;
8use std::hash::Hash;
9use std::ops::{Deref, DerefMut};
10use std::pin::Pin;
11use std::sync::{Arc, Mutex, Weak};
12use std::task::{self, ready, Poll};
13
14use std::time::{Duration, Instant};
15
16use futures_channel::oneshot;
17use tracing::{debug, trace};
18
19use hyper::rt::Timer as _;
20
21use crate::common::{exec, exec::Exec, timer::Timer};
22
23// FIXME: allow() required due to `impl Trait` leaking types to this lint
24#[allow(missing_debug_implementations)]
25pub struct Pool<T, K: Key> {
26    // If the pool is disabled, this is None.
27    inner: Option<Arc<Mutex<PoolInner<T, K>>>>,
28}
29
30// Before using a pooled connection, make sure the sender is not dead.
31//
32// This is a trait to allow the `client::pool::tests` to work for `i32`.
33//
34// See https://github.com/hyperium/hyper/issues/1429
35pub trait Poolable: Unpin + Send + Sized + 'static {
36    fn is_open(&self) -> bool;
37    /// Reserve this connection.
38    ///
39    /// Allows for HTTP/2 to return a shared reservation.
40    fn reserve(self) -> Reservation<Self>;
41    fn can_share(&self) -> bool;
42}
43
44pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
45
46impl<T> Key for T where T: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
47
48/// A marker to identify what version a pooled connection is.
49#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
50#[allow(dead_code)]
51pub enum Ver {
52    Auto,
53    Http2,
54}
55
56/// When checking out a pooled connection, it might be that the connection
57/// only supports a single reservation, or it might be usable for many.
58///
59/// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
60/// used for multiple requests.
61// FIXME: allow() required due to `impl Trait` leaking types to this lint
62#[allow(missing_debug_implementations)]
63pub enum Reservation<T> {
64    /// This connection could be used multiple times, the first one will be
65    /// reinserted into the `idle` pool, and the second will be given to
66    /// the `Checkout`.
67    #[cfg(feature = "http2")]
68    Shared(T, T),
69    /// This connection requires unique access. It will be returned after
70    /// use is complete.
71    Unique(T),
72}
73
74/// Simple type alias in case the key type needs to be adjusted.
75// pub type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>;
76
77struct PoolInner<T, K: Eq + Hash> {
78    // A flag that a connection is being established, and the connection
79    // should be shared. This prevents making multiple HTTP/2 connections
80    // to the same host.
81    connecting: HashSet<K>,
82    // These are internal Conns sitting in the event loop in the KeepAlive
83    // state, waiting to receive a new Request to send on the socket.
84    idle: HashMap<K, Vec<Idle<T>>>,
85    max_idle_per_host: usize,
86    // These are outstanding Checkouts that are waiting for a socket to be
87    // able to send a Request one. This is used when "racing" for a new
88    // connection.
89    //
90    // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
91    // for the Pool to receive an idle Conn. When a Conn becomes idle,
92    // this list is checked for any parked Checkouts, and tries to notify
93    // them that the Conn could be used instead of waiting for a brand new
94    // connection.
95    waiters: HashMap<K, VecDeque<oneshot::Sender<T>>>,
96    // A oneshot channel is used to allow the interval to be notified when
97    // the Pool completely drops. That way, the interval can cancel immediately.
98    idle_interval_ref: Option<oneshot::Sender<Infallible>>,
99    exec: Exec,
100    timer: Option<Timer>,
101    timeout: Option<Duration>,
102}
103
104// This is because `Weak::new()` *allocates* space for `T`, even if it
105// doesn't need it!
106struct WeakOpt<T>(Option<Weak<T>>);
107
108#[derive(Clone, Copy, Debug)]
109pub struct Config {
110    pub idle_timeout: Option<Duration>,
111    pub max_idle_per_host: usize,
112}
113
114impl Config {
115    pub fn is_enabled(&self) -> bool {
116        self.max_idle_per_host > 0
117    }
118}
119
120impl<T, K: Key> Pool<T, K> {
121    pub fn new<E, M>(config: Config, executor: E, timer: Option<M>) -> Pool<T, K>
122    where
123        E: hyper::rt::Executor<exec::BoxSendFuture> + Send + Sync + Clone + 'static,
124        M: hyper::rt::Timer + Send + Sync + Clone + 'static,
125    {
126        let exec = Exec::new(executor);
127        let timer = timer.map(|t| Timer::new(t));
128        let inner = if config.is_enabled() {
129            Some(Arc::new(Mutex::new(PoolInner {
130                connecting: HashSet::new(),
131                idle: HashMap::new(),
132                idle_interval_ref: None,
133                max_idle_per_host: config.max_idle_per_host,
134                waiters: HashMap::new(),
135                exec,
136                timer,
137                timeout: config.idle_timeout,
138            })))
139        } else {
140            None
141        };
142
143        Pool { inner }
144    }
145
146    pub(crate) fn is_enabled(&self) -> bool {
147        self.inner.is_some()
148    }
149
150    #[cfg(test)]
151    pub(super) fn no_timer(&self) {
152        // Prevent an actual interval from being created for this pool...
153        {
154            let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
155            assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
156            let (tx, _) = oneshot::channel();
157            inner.idle_interval_ref = Some(tx);
158        }
159    }
160}
161
162impl<T: Poolable, K: Key> Pool<T, K> {
163    /// Returns a `Checkout` which is a future that resolves if an idle
164    /// connection becomes available.
165    pub fn checkout(&self, key: K) -> Checkout<T, K> {
166        Checkout {
167            key,
168            pool: self.clone(),
169            waiter: None,
170        }
171    }
172
173    /// Ensure that there is only ever 1 connecting task for HTTP/2
174    /// connections. This does nothing for HTTP/1.
175    pub fn connecting(&self, key: &K, ver: Ver) -> Option<Connecting<T, K>> {
176        if ver == Ver::Http2 {
177            if let Some(ref enabled) = self.inner {
178                let mut inner = enabled.lock().unwrap();
179                return if inner.connecting.insert(key.clone()) {
180                    let connecting = Connecting {
181                        key: key.clone(),
182                        pool: WeakOpt::downgrade(enabled),
183                    };
184                    Some(connecting)
185                } else {
186                    trace!("HTTP/2 connecting already in progress for {:?}", key);
187                    None
188                };
189            }
190        }
191
192        // else
193        Some(Connecting {
194            key: key.clone(),
195            // in HTTP/1's case, there is never a lock, so we don't
196            // need to do anything in Drop.
197            pool: WeakOpt::none(),
198        })
199    }
200
201    #[cfg(test)]
202    fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T, K>> {
203        self.inner.as_ref().expect("enabled").lock().expect("lock")
204    }
205
206    /* Used in client/tests.rs...
207    #[cfg(test)]
208    pub(super) fn h1_key(&self, s: &str) -> Key {
209        Arc::new(s.to_string())
210    }
211
212    #[cfg(test)]
213    pub(super) fn idle_count(&self, key: &Key) -> usize {
214        self
215            .locked()
216            .idle
217            .get(key)
218            .map(|list| list.len())
219            .unwrap_or(0)
220    }
221    */
222
223    pub fn pooled(
224        &self,
225        #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T, K>,
226        value: T,
227    ) -> Pooled<T, K> {
228        let (value, pool_ref) = if let Some(ref enabled) = self.inner {
229            match value.reserve() {
230                #[cfg(feature = "http2")]
231                Reservation::Shared(to_insert, to_return) => {
232                    let mut inner = enabled.lock().unwrap();
233                    inner.put(connecting.key.clone(), to_insert, enabled);
234                    // Do this here instead of Drop for Connecting because we
235                    // already have a lock, no need to lock the mutex twice.
236                    inner.connected(&connecting.key);
237                    // prevent the Drop of Connecting from repeating inner.connected()
238                    connecting.pool = WeakOpt::none();
239
240                    // Shared reservations don't need a reference to the pool,
241                    // since the pool always keeps a copy.
242                    (to_return, WeakOpt::none())
243                }
244                Reservation::Unique(value) => {
245                    // Unique reservations must take a reference to the pool
246                    // since they hope to reinsert once the reservation is
247                    // completed
248                    (value, WeakOpt::downgrade(enabled))
249                }
250            }
251        } else {
252            // If pool is not enabled, skip all the things...
253
254            // The Connecting should have had no pool ref
255            debug_assert!(connecting.pool.upgrade().is_none());
256
257            (value, WeakOpt::none())
258        };
259        Pooled {
260            key: connecting.key.clone(),
261            is_reused: false,
262            pool: pool_ref,
263            value: Some(value),
264        }
265    }
266
267    fn reuse(&self, key: &K, value: T) -> Pooled<T, K> {
268        debug!("reuse idle connection for {:?}", key);
269        // TODO: unhack this
270        // In Pool::pooled(), which is used for inserting brand new connections,
271        // there's some code that adjusts the pool reference taken depending
272        // on if the Reservation can be shared or is unique. By the time
273        // reuse() is called, the reservation has already been made, and
274        // we just have the final value, without knowledge of if this is
275        // unique or shared. So, the hack is to just assume Ver::Http2 means
276        // shared... :(
277        let mut pool_ref = WeakOpt::none();
278        if !value.can_share() {
279            if let Some(ref enabled) = self.inner {
280                pool_ref = WeakOpt::downgrade(enabled);
281            }
282        }
283
284        Pooled {
285            is_reused: true,
286            key: key.clone(),
287            pool: pool_ref,
288            value: Some(value),
289        }
290    }
291}
292
293/// Pop off this list, looking for a usable connection that hasn't expired.
294struct IdlePopper<'a, T, K> {
295    key: &'a K,
296    list: &'a mut Vec<Idle<T>>,
297}
298
299impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> {
300    fn pop(self, expiration: &Expiration, now: Instant) -> Option<Idle<T>> {
301        while let Some(entry) = self.list.pop() {
302            // If the connection has been closed, or is older than our idle
303            // timeout, simply drop it and keep looking...
304            if !entry.value.is_open() {
305                trace!("removing closed connection for {:?}", self.key);
306                continue;
307            }
308            // TODO: Actually, since the `idle` list is pushed to the end always,
309            // that would imply that if *this* entry is expired, then anything
310            // "earlier" in the list would *have* to be expired also... Right?
311            //
312            // In that case, we could just break out of the loop and drop the
313            // whole list...
314            if expiration.expires(entry.idle_at, now) {
315                trace!("removing expired connection for {:?}", self.key);
316                continue;
317            }
318
319            let value = match entry.value.reserve() {
320                #[cfg(feature = "http2")]
321                Reservation::Shared(to_reinsert, to_checkout) => {
322                    self.list.push(Idle {
323                        idle_at: now,
324                        value: to_reinsert,
325                    });
326                    to_checkout
327                }
328                Reservation::Unique(unique) => unique,
329            };
330
331            return Some(Idle {
332                idle_at: entry.idle_at,
333                value,
334            });
335        }
336
337        None
338    }
339}
340
341impl<T: Poolable, K: Key> PoolInner<T, K> {
342    fn now(&self) -> Instant {
343        self.timer
344            .as_ref()
345            .map_or_else(|| Instant::now(), |t| t.now())
346    }
347
348    fn put(&mut self, key: K, value: T, __pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
349        if value.can_share() && self.idle.contains_key(&key) {
350            trace!("put; existing idle HTTP/2 connection for {:?}", key);
351            return;
352        }
353        trace!("put; add idle connection for {:?}", key);
354        let mut remove_waiters = false;
355        let mut value = Some(value);
356        if let Some(waiters) = self.waiters.get_mut(&key) {
357            while let Some(tx) = waiters.pop_front() {
358                if !tx.is_canceled() {
359                    let reserved = value.take().expect("value already sent");
360                    let reserved = match reserved.reserve() {
361                        #[cfg(feature = "http2")]
362                        Reservation::Shared(to_keep, to_send) => {
363                            value = Some(to_keep);
364                            to_send
365                        }
366                        Reservation::Unique(uniq) => uniq,
367                    };
368                    match tx.send(reserved) {
369                        Ok(()) => {
370                            if value.is_none() {
371                                break;
372                            } else {
373                                continue;
374                            }
375                        }
376                        Err(e) => {
377                            value = Some(e);
378                        }
379                    }
380                }
381
382                trace!("put; removing canceled waiter for {:?}", key);
383            }
384            remove_waiters = waiters.is_empty();
385        }
386        if remove_waiters {
387            self.waiters.remove(&key);
388        }
389
390        match value {
391            Some(value) => {
392                // borrow-check scope...
393                {
394                    let now = self.now();
395                    let idle_list = self.idle.entry(key.clone()).or_default();
396                    if self.max_idle_per_host <= idle_list.len() {
397                        trace!("max idle per host for {:?}, dropping connection", key);
398                        return;
399                    }
400
401                    debug!("pooling idle connection for {:?}", key);
402                    idle_list.push(Idle {
403                        value,
404                        idle_at: now,
405                    });
406                }
407
408                self.spawn_idle_interval(__pool_ref);
409            }
410            None => trace!("put; found waiter for {:?}", key),
411        }
412    }
413
414    /// A `Connecting` task is complete. Not necessarily successfully,
415    /// but the lock is going away, so clean up.
416    fn connected(&mut self, key: &K) {
417        let existed = self.connecting.remove(key);
418        debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
419        // cancel any waiters. if there are any, it's because
420        // this Connecting task didn't complete successfully.
421        // those waiters would never receive a connection.
422        self.waiters.remove(key);
423    }
424
425    fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
426        if self.idle_interval_ref.is_some() {
427            return;
428        }
429        let dur = if let Some(dur) = self.timeout {
430            dur
431        } else {
432            return;
433        };
434        if dur == Duration::ZERO {
435            return;
436        }
437        let timer = if let Some(timer) = self.timer.clone() {
438            timer
439        } else {
440            return;
441        };
442
443        // While someone might want a shorter duration, and it will be respected
444        // at checkout time, there's no need to wake up and proactively evict
445        // faster than this.
446        const MIN_CHECK: Duration = Duration::from_millis(90);
447
448        let dur = dur.max(MIN_CHECK);
449
450        let (tx, rx) = oneshot::channel();
451        self.idle_interval_ref = Some(tx);
452
453        let interval = IdleTask {
454            timer: timer.clone(),
455            duration: dur,
456            pool: WeakOpt::downgrade(pool_ref),
457            pool_drop_notifier: rx,
458        };
459
460        self.exec.execute(interval.run());
461    }
462}
463
464impl<T, K: Eq + Hash> PoolInner<T, K> {
465    /// Any `FutureResponse`s that were created will have made a `Checkout`,
466    /// and possibly inserted into the pool that it is waiting for an idle
467    /// connection. If a user ever dropped that future, we need to clean out
468    /// those parked senders.
469    fn clean_waiters(&mut self, key: &K) {
470        let mut remove_waiters = false;
471        if let Some(waiters) = self.waiters.get_mut(key) {
472            waiters.retain(|tx| !tx.is_canceled());
473            remove_waiters = waiters.is_empty();
474        }
475        if remove_waiters {
476            self.waiters.remove(key);
477        }
478    }
479}
480
481impl<T: Poolable, K: Key> PoolInner<T, K> {
482    /// This should *only* be called by the IdleTask
483    fn clear_expired(&mut self) {
484        let dur = self.timeout.expect("interval assumes timeout");
485
486        let now = self.now();
487        //self.last_idle_check_at = now;
488
489        self.idle.retain(|key, values| {
490            values.retain(|entry| {
491                if !entry.value.is_open() {
492                    trace!("idle interval evicting closed for {:?}", key);
493                    return false;
494                }
495
496                // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470.
497                if now.saturating_duration_since(entry.idle_at) > dur {
498                    trace!("idle interval evicting expired for {:?}", key);
499                    return false;
500                }
501
502                // Otherwise, keep this value...
503                true
504            });
505
506            // returning false evicts this key/val
507            !values.is_empty()
508        });
509    }
510}
511
512impl<T, K: Key> Clone for Pool<T, K> {
513    fn clone(&self) -> Pool<T, K> {
514        Pool {
515            inner: self.inner.clone(),
516        }
517    }
518}
519
520/// A wrapped poolable value that tries to reinsert to the Pool on Drop.
521// Note: The bounds `T: Poolable` is needed for the Drop impl.
522pub struct Pooled<T: Poolable, K: Key> {
523    value: Option<T>,
524    is_reused: bool,
525    key: K,
526    pool: WeakOpt<Mutex<PoolInner<T, K>>>,
527}
528
529impl<T: Poolable, K: Key> Pooled<T, K> {
530    pub fn is_reused(&self) -> bool {
531        self.is_reused
532    }
533
534    pub fn is_pool_enabled(&self) -> bool {
535        self.pool.0.is_some()
536    }
537
538    fn as_ref(&self) -> &T {
539        self.value.as_ref().expect("not dropped")
540    }
541
542    fn as_mut(&mut self) -> &mut T {
543        self.value.as_mut().expect("not dropped")
544    }
545}
546
547impl<T: Poolable, K: Key> Deref for Pooled<T, K> {
548    type Target = T;
549    fn deref(&self) -> &T {
550        self.as_ref()
551    }
552}
553
554impl<T: Poolable, K: Key> DerefMut for Pooled<T, K> {
555    fn deref_mut(&mut self) -> &mut T {
556        self.as_mut()
557    }
558}
559
560impl<T: Poolable, K: Key> Drop for Pooled<T, K> {
561    fn drop(&mut self) {
562        if let Some(value) = self.value.take() {
563            if !value.is_open() {
564                // If we *already* know the connection is done here,
565                // it shouldn't be re-inserted back into the pool.
566                return;
567            }
568
569            if let Some(pool) = self.pool.upgrade() {
570                if let Ok(mut inner) = pool.lock() {
571                    inner.put(self.key.clone(), value, &pool);
572                }
573            } else if !value.can_share() {
574                trace!("pool dropped, dropping pooled ({:?})", self.key);
575            }
576            // Ver::Http2 is already in the Pool (or dead), so we wouldn't
577            // have an actual reference to the Pool.
578        }
579    }
580}
581
582impl<T: Poolable, K: Key> fmt::Debug for Pooled<T, K> {
583    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
584        f.debug_struct("Pooled").field("key", &self.key).finish()
585    }
586}
587
588struct Idle<T> {
589    idle_at: Instant,
590    value: T,
591}
592
593// FIXME: allow() required due to `impl Trait` leaking types to this lint
594#[allow(missing_debug_implementations)]
595pub struct Checkout<T, K: Key> {
596    key: K,
597    pool: Pool<T, K>,
598    waiter: Option<oneshot::Receiver<T>>,
599}
600
601#[derive(Debug)]
602#[non_exhaustive]
603pub enum Error {
604    PoolDisabled,
605    CheckoutNoLongerWanted,
606    CheckedOutClosedValue,
607}
608
609impl Error {
610    pub(super) fn is_canceled(&self) -> bool {
611        matches!(self, Error::CheckedOutClosedValue)
612    }
613}
614
615impl fmt::Display for Error {
616    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
617        f.write_str(match self {
618            Error::PoolDisabled => "pool is disabled",
619            Error::CheckedOutClosedValue => "checked out connection was closed",
620            Error::CheckoutNoLongerWanted => "request was canceled",
621        })
622    }
623}
624
625impl StdError for Error {}
626
627impl<T: Poolable, K: Key> Checkout<T, K> {
628    fn poll_waiter(
629        &mut self,
630        cx: &mut task::Context<'_>,
631    ) -> Poll<Option<Result<Pooled<T, K>, Error>>> {
632        if let Some(mut rx) = self.waiter.take() {
633            match Pin::new(&mut rx).poll(cx) {
634                Poll::Ready(Ok(value)) => {
635                    if value.is_open() {
636                        Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
637                    } else {
638                        Poll::Ready(Some(Err(Error::CheckedOutClosedValue)))
639                    }
640                }
641                Poll::Pending => {
642                    self.waiter = Some(rx);
643                    Poll::Pending
644                }
645                Poll::Ready(Err(_canceled)) => {
646                    Poll::Ready(Some(Err(Error::CheckoutNoLongerWanted)))
647                }
648            }
649        } else {
650            Poll::Ready(None)
651        }
652    }
653
654    fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T, K>> {
655        let entry = {
656            let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
657            let expiration = Expiration::new(inner.timeout);
658            let now = inner.now();
659            let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
660                trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
661                // A block to end the mutable borrow on list,
662                // so the map below can check is_empty()
663                {
664                    let popper = IdlePopper {
665                        key: &self.key,
666                        list,
667                    };
668                    popper.pop(&expiration, now)
669                }
670                .map(|e| (e, list.is_empty()))
671            });
672
673            let (entry, empty) = if let Some((e, empty)) = maybe_entry {
674                (Some(e), empty)
675            } else {
676                // No entry found means nuke the list for sure.
677                (None, true)
678            };
679            if empty {
680                //TODO: This could be done with the HashMap::entry API instead.
681                inner.idle.remove(&self.key);
682            }
683
684            if entry.is_none() && self.waiter.is_none() {
685                let (tx, mut rx) = oneshot::channel();
686                trace!("checkout waiting for idle connection: {:?}", self.key);
687                inner
688                    .waiters
689                    .entry(self.key.clone())
690                    .or_insert_with(VecDeque::new)
691                    .push_back(tx);
692
693                // register the waker with this oneshot
694                assert!(Pin::new(&mut rx).poll(cx).is_pending());
695                self.waiter = Some(rx);
696            }
697
698            entry
699        };
700
701        entry.map(|e| self.pool.reuse(&self.key, e.value))
702    }
703}
704
705impl<T: Poolable, K: Key> Future for Checkout<T, K> {
706    type Output = Result<Pooled<T, K>, Error>;
707
708    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
709        if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
710            return Poll::Ready(Ok(pooled));
711        }
712
713        if let Some(pooled) = self.checkout(cx) {
714            Poll::Ready(Ok(pooled))
715        } else if !self.pool.is_enabled() {
716            Poll::Ready(Err(Error::PoolDisabled))
717        } else {
718            // There's a new waiter, already registered in self.checkout()
719            debug_assert!(self.waiter.is_some());
720            Poll::Pending
721        }
722    }
723}
724
725impl<T, K: Key> Drop for Checkout<T, K> {
726    fn drop(&mut self) {
727        if self.waiter.take().is_some() {
728            trace!("checkout dropped for {:?}", self.key);
729            if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
730                inner.clean_waiters(&self.key);
731            }
732        }
733    }
734}
735
736// FIXME: allow() required due to `impl Trait` leaking types to this lint
737#[allow(missing_debug_implementations)]
738pub struct Connecting<T: Poolable, K: Key> {
739    key: K,
740    pool: WeakOpt<Mutex<PoolInner<T, K>>>,
741}
742
743impl<T: Poolable, K: Key> Connecting<T, K> {
744    pub fn alpn_h2(self, pool: &Pool<T, K>) -> Option<Self> {
745        debug_assert!(
746            self.pool.0.is_none(),
747            "Connecting::alpn_h2 but already Http2"
748        );
749
750        pool.connecting(&self.key, Ver::Http2)
751    }
752}
753
754impl<T: Poolable, K: Key> Drop for Connecting<T, K> {
755    fn drop(&mut self) {
756        if let Some(pool) = self.pool.upgrade() {
757            // No need to panic on drop, that could abort!
758            if let Ok(mut inner) = pool.lock() {
759                inner.connected(&self.key);
760            }
761        }
762    }
763}
764
765struct Expiration(Option<Duration>);
766
767impl Expiration {
768    fn new(dur: Option<Duration>) -> Expiration {
769        Expiration(dur)
770    }
771
772    fn expires(&self, instant: Instant, now: Instant) -> bool {
773        match self.0 {
774            // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470.
775            Some(timeout) => now.saturating_duration_since(instant) > timeout,
776            None => false,
777        }
778    }
779}
780
781struct IdleTask<T, K: Key> {
782    timer: Timer,
783    duration: Duration,
784    pool: WeakOpt<Mutex<PoolInner<T, K>>>,
785    // This allows the IdleTask to be notified as soon as the entire
786    // Pool is fully dropped, and shutdown. This channel is never sent on,
787    // but Err(Canceled) will be received when the Pool is dropped.
788    pool_drop_notifier: oneshot::Receiver<Infallible>,
789}
790
791impl<T: Poolable + 'static, K: Key> IdleTask<T, K> {
792    async fn run(self) {
793        use futures_util::future;
794
795        let mut sleep = self.timer.sleep_until(self.timer.now() + self.duration);
796        let mut on_pool_drop = self.pool_drop_notifier;
797        loop {
798            match future::select(&mut on_pool_drop, &mut sleep).await {
799                future::Either::Left(_) => {
800                    // pool dropped, bah-bye
801                    break;
802                }
803                future::Either::Right(((), _)) => {
804                    if let Some(inner) = self.pool.upgrade() {
805                        if let Ok(mut inner) = inner.lock() {
806                            trace!("idle interval checking for expired");
807                            inner.clear_expired();
808                        }
809                    }
810
811                    let deadline = self.timer.now() + self.duration;
812                    self.timer.reset(&mut sleep, deadline);
813                }
814            }
815        }
816
817        trace!("pool closed, canceling idle interval");
818        return;
819    }
820}
821
822impl<T> WeakOpt<T> {
823    fn none() -> Self {
824        WeakOpt(None)
825    }
826
827    fn downgrade(arc: &Arc<T>) -> Self {
828        WeakOpt(Some(Arc::downgrade(arc)))
829    }
830
831    fn upgrade(&self) -> Option<Arc<T>> {
832        self.0.as_ref().and_then(Weak::upgrade)
833    }
834}
835
836#[cfg(test)]
837mod tests {
838    use std::fmt::Debug;
839    use std::future::Future;
840    use std::hash::Hash;
841    use std::pin::Pin;
842    use std::task::{self, Poll};
843    use std::time::Duration;
844
845    use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
846    use crate::rt::{TokioExecutor, TokioTimer};
847
848    use crate::common::timer;
849
850    #[derive(Clone, Debug, PartialEq, Eq, Hash)]
851    struct KeyImpl(http::uri::Scheme, http::uri::Authority);
852
853    type KeyTuple = (http::uri::Scheme, http::uri::Authority);
854
855    /// Test unique reservations.
856    #[derive(Debug, PartialEq, Eq)]
857    struct Uniq<T>(T);
858
859    impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
860        fn is_open(&self) -> bool {
861            true
862        }
863
864        fn reserve(self) -> Reservation<Self> {
865            Reservation::Unique(self)
866        }
867
868        fn can_share(&self) -> bool {
869            false
870        }
871    }
872
873    fn c<T: Poolable, K: Key>(key: K) -> Connecting<T, K> {
874        Connecting {
875            key,
876            pool: WeakOpt::none(),
877        }
878    }
879
880    fn host_key(s: &str) -> KeyImpl {
881        KeyImpl(http::uri::Scheme::HTTP, s.parse().expect("host key"))
882    }
883
884    fn pool_no_timer<T, K: Key>() -> Pool<T, K> {
885        pool_max_idle_no_timer(usize::MAX)
886    }
887
888    fn pool_max_idle_no_timer<T, K: Key>(max_idle: usize) -> Pool<T, K> {
889        let pool = Pool::new(
890            super::Config {
891                idle_timeout: Some(Duration::from_millis(100)),
892                max_idle_per_host: max_idle,
893            },
894            TokioExecutor::new(),
895            Option::<timer::Timer>::None,
896        );
897        pool.no_timer();
898        pool
899    }
900
901    #[tokio::test]
902    async fn test_pool_checkout_smoke() {
903        let pool = pool_no_timer();
904        let key = host_key("foo");
905        let pooled = pool.pooled(c(key.clone()), Uniq(41));
906
907        drop(pooled);
908
909        match pool.checkout(key).await {
910            Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
911            Err(_) => panic!("not ready"),
912        };
913    }
914
915    /// Helper to check if the future is ready after polling once.
916    struct PollOnce<'a, F>(&'a mut F);
917
918    impl<F, T, U> Future for PollOnce<'_, F>
919    where
920        F: Future<Output = Result<T, U>> + Unpin,
921    {
922        type Output = Option<()>;
923
924        fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
925            match Pin::new(&mut self.0).poll(cx) {
926                Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
927                Poll::Ready(Err(_)) => Poll::Ready(Some(())),
928                Poll::Pending => Poll::Ready(None),
929            }
930        }
931    }
932
933    #[tokio::test]
934    async fn test_pool_checkout_returns_none_if_expired() {
935        let pool = pool_no_timer();
936        let key = host_key("foo");
937        let pooled = pool.pooled(c(key.clone()), Uniq(41));
938
939        drop(pooled);
940        tokio::time::sleep(pool.locked().timeout.unwrap()).await;
941        let mut checkout = pool.checkout(key);
942        let poll_once = PollOnce(&mut checkout);
943        let is_not_ready = poll_once.await.is_none();
944        assert!(is_not_ready);
945    }
946
947    #[tokio::test]
948    async fn test_pool_checkout_removes_expired() {
949        let pool = pool_no_timer();
950        let key = host_key("foo");
951
952        pool.pooled(c(key.clone()), Uniq(41));
953        pool.pooled(c(key.clone()), Uniq(5));
954        pool.pooled(c(key.clone()), Uniq(99));
955
956        assert_eq!(
957            pool.locked().idle.get(&key).map(|entries| entries.len()),
958            Some(3)
959        );
960        tokio::time::sleep(pool.locked().timeout.unwrap()).await;
961
962        let mut checkout = pool.checkout(key.clone());
963        let poll_once = PollOnce(&mut checkout);
964        // checkout.await should clean out the expired
965        poll_once.await;
966        assert!(!pool.locked().idle.contains_key(&key));
967    }
968
969    #[test]
970    fn test_pool_max_idle_per_host() {
971        let pool = pool_max_idle_no_timer(2);
972        let key = host_key("foo");
973
974        pool.pooled(c(key.clone()), Uniq(41));
975        pool.pooled(c(key.clone()), Uniq(5));
976        pool.pooled(c(key.clone()), Uniq(99));
977
978        // pooled and dropped 3, max_idle should only allow 2
979        assert_eq!(
980            pool.locked().idle.get(&key).map(|entries| entries.len()),
981            Some(2)
982        );
983    }
984
985    #[tokio::test]
986    async fn test_pool_timer_removes_expired_realtime() {
987        test_pool_timer_removes_expired_inner().await
988    }
989
990    #[tokio::test(start_paused = true)]
991    async fn test_pool_timer_removes_expired_faketime() {
992        test_pool_timer_removes_expired_inner().await
993    }
994
995    async fn test_pool_timer_removes_expired_inner() {
996        let pool = Pool::new(
997            super::Config {
998                idle_timeout: Some(Duration::from_millis(10)),
999                max_idle_per_host: usize::MAX,
1000            },
1001            TokioExecutor::new(),
1002            Some(TokioTimer::new()),
1003        );
1004
1005        let key = host_key("foo");
1006
1007        pool.pooled(c(key.clone()), Uniq(41));
1008        pool.pooled(c(key.clone()), Uniq(5));
1009        pool.pooled(c(key.clone()), Uniq(99));
1010
1011        assert_eq!(
1012            pool.locked().idle.get(&key).map(|entries| entries.len()),
1013            Some(3)
1014        );
1015
1016        // Let the timer tick passed the expiration...
1017        tokio::time::sleep(Duration::from_millis(30)).await;
1018
1019        // But minimum interval is higher, so nothing should have been reaped
1020        assert_eq!(
1021            pool.locked().idle.get(&key).map(|entries| entries.len()),
1022            Some(3)
1023        );
1024
1025        // Now wait passed the minimum interval more
1026        tokio::time::sleep(Duration::from_millis(70)).await;
1027        // Yield in case other task hasn't been able to run :shrug:
1028        tokio::task::yield_now().await;
1029
1030        assert!(!pool.locked().idle.contains_key(&key));
1031    }
1032
1033    #[tokio::test]
1034    async fn test_pool_checkout_task_unparked() {
1035        use futures_util::future::join;
1036        use futures_util::FutureExt;
1037
1038        let pool = pool_no_timer();
1039        let key = host_key("foo");
1040        let pooled = pool.pooled(c(key.clone()), Uniq(41));
1041
1042        let checkout = join(pool.checkout(key), async {
1043            // the checkout future will park first,
1044            // and then this lazy future will be polled, which will insert
1045            // the pooled back into the pool
1046            //
1047            // this test makes sure that doing so will unpark the checkout
1048            drop(pooled);
1049        })
1050        .map(|(entry, _)| entry);
1051
1052        assert_eq!(*checkout.await.unwrap(), Uniq(41));
1053    }
1054
1055    #[tokio::test]
1056    async fn test_pool_checkout_drop_cleans_up_waiters() {
1057        let pool = pool_no_timer::<Uniq<i32>, KeyImpl>();
1058        let key = host_key("foo");
1059
1060        let mut checkout1 = pool.checkout(key.clone());
1061        let mut checkout2 = pool.checkout(key.clone());
1062
1063        let poll_once1 = PollOnce(&mut checkout1);
1064        let poll_once2 = PollOnce(&mut checkout2);
1065
1066        // first poll needed to get into Pool's parked
1067        poll_once1.await;
1068        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1069        poll_once2.await;
1070        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
1071
1072        // on drop, clean up Pool
1073        drop(checkout1);
1074        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1075
1076        drop(checkout2);
1077        assert!(!pool.locked().waiters.contains_key(&key));
1078    }
1079
1080    #[derive(Debug)]
1081    struct CanClose {
1082        #[allow(unused)]
1083        val: i32,
1084        closed: bool,
1085    }
1086
1087    impl Poolable for CanClose {
1088        fn is_open(&self) -> bool {
1089            !self.closed
1090        }
1091
1092        fn reserve(self) -> Reservation<Self> {
1093            Reservation::Unique(self)
1094        }
1095
1096        fn can_share(&self) -> bool {
1097            false
1098        }
1099    }
1100
1101    #[test]
1102    fn pooled_drop_if_closed_doesnt_reinsert() {
1103        let pool = pool_no_timer();
1104        let key = host_key("foo");
1105        pool.pooled(
1106            c(key.clone()),
1107            CanClose {
1108                val: 57,
1109                closed: true,
1110            },
1111        );
1112
1113        assert!(!pool.locked().idle.contains_key(&key));
1114    }
1115}