1#![allow(dead_code)]
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::convert::Infallible;
5use std::error::Error as StdError;
6use std::fmt::{self, Debug};
7use std::future::Future;
8use std::hash::Hash;
9use std::ops::{Deref, DerefMut};
10use std::pin::Pin;
11use std::sync::{Arc, Mutex, Weak};
12use std::task::{self, ready, Poll};
13
14use std::time::{Duration, Instant};
15
16use futures_channel::oneshot;
17use tracing::{debug, trace};
18
19use hyper::rt::Timer as _;
20
21use crate::common::{exec, exec::Exec, timer::Timer};
22
23#[allow(missing_debug_implementations)]
25pub struct Pool<T, K: Key> {
26 inner: Option<Arc<Mutex<PoolInner<T, K>>>>,
28}
29
30pub trait Poolable: Unpin + Send + Sized + 'static {
36 fn is_open(&self) -> bool;
37 fn reserve(self) -> Reservation<Self>;
41 fn can_share(&self) -> bool;
42}
43
44pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
45
46impl<T> Key for T where T: Eq + Hash + Clone + Debug + Unpin + Send + 'static {}
47
48#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
50#[allow(dead_code)]
51pub enum Ver {
52 Auto,
53 Http2,
54}
55
56#[allow(missing_debug_implementations)]
63pub enum Reservation<T> {
64 #[cfg(feature = "http2")]
68 Shared(T, T),
69 Unique(T),
72}
73
74struct PoolInner<T, K: Eq + Hash> {
78 connecting: HashSet<K>,
82 idle: HashMap<K, Vec<Idle<T>>>,
85 max_idle_per_host: usize,
86 waiters: HashMap<K, VecDeque<oneshot::Sender<T>>>,
96 idle_interval_ref: Option<oneshot::Sender<Infallible>>,
99 exec: Exec,
100 timer: Option<Timer>,
101 timeout: Option<Duration>,
102}
103
104struct WeakOpt<T>(Option<Weak<T>>);
107
108#[derive(Clone, Copy, Debug)]
109pub struct Config {
110 pub idle_timeout: Option<Duration>,
111 pub max_idle_per_host: usize,
112}
113
114impl Config {
115 pub fn is_enabled(&self) -> bool {
116 self.max_idle_per_host > 0
117 }
118}
119
120impl<T, K: Key> Pool<T, K> {
121 pub fn new<E, M>(config: Config, executor: E, timer: Option<M>) -> Pool<T, K>
122 where
123 E: hyper::rt::Executor<exec::BoxSendFuture> + Send + Sync + Clone + 'static,
124 M: hyper::rt::Timer + Send + Sync + Clone + 'static,
125 {
126 let exec = Exec::new(executor);
127 let timer = timer.map(|t| Timer::new(t));
128 let inner = if config.is_enabled() {
129 Some(Arc::new(Mutex::new(PoolInner {
130 connecting: HashSet::new(),
131 idle: HashMap::new(),
132 idle_interval_ref: None,
133 max_idle_per_host: config.max_idle_per_host,
134 waiters: HashMap::new(),
135 exec,
136 timer,
137 timeout: config.idle_timeout,
138 })))
139 } else {
140 None
141 };
142
143 Pool { inner }
144 }
145
146 pub(crate) fn is_enabled(&self) -> bool {
147 self.inner.is_some()
148 }
149
150 #[cfg(test)]
151 pub(super) fn no_timer(&self) {
152 {
154 let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
155 assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
156 let (tx, _) = oneshot::channel();
157 inner.idle_interval_ref = Some(tx);
158 }
159 }
160}
161
162impl<T: Poolable, K: Key> Pool<T, K> {
163 pub fn checkout(&self, key: K) -> Checkout<T, K> {
166 Checkout {
167 key,
168 pool: self.clone(),
169 waiter: None,
170 }
171 }
172
173 pub fn connecting(&self, key: &K, ver: Ver) -> Option<Connecting<T, K>> {
176 if ver == Ver::Http2 {
177 if let Some(ref enabled) = self.inner {
178 let mut inner = enabled.lock().unwrap();
179 return if inner.connecting.insert(key.clone()) {
180 let connecting = Connecting {
181 key: key.clone(),
182 pool: WeakOpt::downgrade(enabled),
183 };
184 Some(connecting)
185 } else {
186 trace!("HTTP/2 connecting already in progress for {:?}", key);
187 None
188 };
189 }
190 }
191
192 Some(Connecting {
194 key: key.clone(),
195 pool: WeakOpt::none(),
198 })
199 }
200
201 #[cfg(test)]
202 fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T, K>> {
203 self.inner.as_ref().expect("enabled").lock().expect("lock")
204 }
205
206 pub fn pooled(
224 &self,
225 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T, K>,
226 value: T,
227 ) -> Pooled<T, K> {
228 let (value, pool_ref) = if let Some(ref enabled) = self.inner {
229 match value.reserve() {
230 #[cfg(feature = "http2")]
231 Reservation::Shared(to_insert, to_return) => {
232 let mut inner = enabled.lock().unwrap();
233 inner.put(connecting.key.clone(), to_insert, enabled);
234 inner.connected(&connecting.key);
237 connecting.pool = WeakOpt::none();
239
240 (to_return, WeakOpt::none())
243 }
244 Reservation::Unique(value) => {
245 (value, WeakOpt::downgrade(enabled))
249 }
250 }
251 } else {
252 debug_assert!(connecting.pool.upgrade().is_none());
256
257 (value, WeakOpt::none())
258 };
259 Pooled {
260 key: connecting.key.clone(),
261 is_reused: false,
262 pool: pool_ref,
263 value: Some(value),
264 }
265 }
266
267 fn reuse(&self, key: &K, value: T) -> Pooled<T, K> {
268 debug!("reuse idle connection for {:?}", key);
269 let mut pool_ref = WeakOpt::none();
278 if !value.can_share() {
279 if let Some(ref enabled) = self.inner {
280 pool_ref = WeakOpt::downgrade(enabled);
281 }
282 }
283
284 Pooled {
285 is_reused: true,
286 key: key.clone(),
287 pool: pool_ref,
288 value: Some(value),
289 }
290 }
291}
292
293struct IdlePopper<'a, T, K> {
295 key: &'a K,
296 list: &'a mut Vec<Idle<T>>,
297}
298
299impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> {
300 fn pop(self, expiration: &Expiration, now: Instant) -> Option<Idle<T>> {
301 while let Some(entry) = self.list.pop() {
302 if !entry.value.is_open() {
305 trace!("removing closed connection for {:?}", self.key);
306 continue;
307 }
308 if expiration.expires(entry.idle_at, now) {
315 trace!("removing expired connection for {:?}", self.key);
316 continue;
317 }
318
319 let value = match entry.value.reserve() {
320 #[cfg(feature = "http2")]
321 Reservation::Shared(to_reinsert, to_checkout) => {
322 self.list.push(Idle {
323 idle_at: now,
324 value: to_reinsert,
325 });
326 to_checkout
327 }
328 Reservation::Unique(unique) => unique,
329 };
330
331 return Some(Idle {
332 idle_at: entry.idle_at,
333 value,
334 });
335 }
336
337 None
338 }
339}
340
341impl<T: Poolable, K: Key> PoolInner<T, K> {
342 fn now(&self) -> Instant {
343 self.timer
344 .as_ref()
345 .map_or_else(|| Instant::now(), |t| t.now())
346 }
347
348 fn put(&mut self, key: K, value: T, __pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
349 if value.can_share() && self.idle.contains_key(&key) {
350 trace!("put; existing idle HTTP/2 connection for {:?}", key);
351 return;
352 }
353 trace!("put; add idle connection for {:?}", key);
354 let mut remove_waiters = false;
355 let mut value = Some(value);
356 if let Some(waiters) = self.waiters.get_mut(&key) {
357 while let Some(tx) = waiters.pop_front() {
358 if !tx.is_canceled() {
359 let reserved = value.take().expect("value already sent");
360 let reserved = match reserved.reserve() {
361 #[cfg(feature = "http2")]
362 Reservation::Shared(to_keep, to_send) => {
363 value = Some(to_keep);
364 to_send
365 }
366 Reservation::Unique(uniq) => uniq,
367 };
368 match tx.send(reserved) {
369 Ok(()) => {
370 if value.is_none() {
371 break;
372 } else {
373 continue;
374 }
375 }
376 Err(e) => {
377 value = Some(e);
378 }
379 }
380 }
381
382 trace!("put; removing canceled waiter for {:?}", key);
383 }
384 remove_waiters = waiters.is_empty();
385 }
386 if remove_waiters {
387 self.waiters.remove(&key);
388 }
389
390 match value {
391 Some(value) => {
392 {
394 let now = self.now();
395 let idle_list = self.idle.entry(key.clone()).or_default();
396 if self.max_idle_per_host <= idle_list.len() {
397 trace!("max idle per host for {:?}, dropping connection", key);
398 return;
399 }
400
401 debug!("pooling idle connection for {:?}", key);
402 idle_list.push(Idle {
403 value,
404 idle_at: now,
405 });
406 }
407
408 self.spawn_idle_interval(__pool_ref);
409 }
410 None => trace!("put; found waiter for {:?}", key),
411 }
412 }
413
414 fn connected(&mut self, key: &K) {
417 let existed = self.connecting.remove(key);
418 debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
419 self.waiters.remove(key);
423 }
424
425 fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
426 if self.idle_interval_ref.is_some() {
427 return;
428 }
429 let dur = if let Some(dur) = self.timeout {
430 dur
431 } else {
432 return;
433 };
434 if dur == Duration::ZERO {
435 return;
436 }
437 let timer = if let Some(timer) = self.timer.clone() {
438 timer
439 } else {
440 return;
441 };
442
443 const MIN_CHECK: Duration = Duration::from_millis(90);
447
448 let dur = dur.max(MIN_CHECK);
449
450 let (tx, rx) = oneshot::channel();
451 self.idle_interval_ref = Some(tx);
452
453 let interval = IdleTask {
454 timer: timer.clone(),
455 duration: dur,
456 pool: WeakOpt::downgrade(pool_ref),
457 pool_drop_notifier: rx,
458 };
459
460 self.exec.execute(interval.run());
461 }
462}
463
464impl<T, K: Eq + Hash> PoolInner<T, K> {
465 fn clean_waiters(&mut self, key: &K) {
470 let mut remove_waiters = false;
471 if let Some(waiters) = self.waiters.get_mut(key) {
472 waiters.retain(|tx| !tx.is_canceled());
473 remove_waiters = waiters.is_empty();
474 }
475 if remove_waiters {
476 self.waiters.remove(key);
477 }
478 }
479}
480
481impl<T: Poolable, K: Key> PoolInner<T, K> {
482 fn clear_expired(&mut self) {
484 let dur = self.timeout.expect("interval assumes timeout");
485
486 let now = self.now();
487 self.idle.retain(|key, values| {
490 values.retain(|entry| {
491 if !entry.value.is_open() {
492 trace!("idle interval evicting closed for {:?}", key);
493 return false;
494 }
495
496 if now.saturating_duration_since(entry.idle_at) > dur {
498 trace!("idle interval evicting expired for {:?}", key);
499 return false;
500 }
501
502 true
504 });
505
506 !values.is_empty()
508 });
509 }
510}
511
512impl<T, K: Key> Clone for Pool<T, K> {
513 fn clone(&self) -> Pool<T, K> {
514 Pool {
515 inner: self.inner.clone(),
516 }
517 }
518}
519
520pub struct Pooled<T: Poolable, K: Key> {
523 value: Option<T>,
524 is_reused: bool,
525 key: K,
526 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
527}
528
529impl<T: Poolable, K: Key> Pooled<T, K> {
530 pub fn is_reused(&self) -> bool {
531 self.is_reused
532 }
533
534 pub fn is_pool_enabled(&self) -> bool {
535 self.pool.0.is_some()
536 }
537
538 fn as_ref(&self) -> &T {
539 self.value.as_ref().expect("not dropped")
540 }
541
542 fn as_mut(&mut self) -> &mut T {
543 self.value.as_mut().expect("not dropped")
544 }
545}
546
547impl<T: Poolable, K: Key> Deref for Pooled<T, K> {
548 type Target = T;
549 fn deref(&self) -> &T {
550 self.as_ref()
551 }
552}
553
554impl<T: Poolable, K: Key> DerefMut for Pooled<T, K> {
555 fn deref_mut(&mut self) -> &mut T {
556 self.as_mut()
557 }
558}
559
560impl<T: Poolable, K: Key> Drop for Pooled<T, K> {
561 fn drop(&mut self) {
562 if let Some(value) = self.value.take() {
563 if !value.is_open() {
564 return;
567 }
568
569 if let Some(pool) = self.pool.upgrade() {
570 if let Ok(mut inner) = pool.lock() {
571 inner.put(self.key.clone(), value, &pool);
572 }
573 } else if !value.can_share() {
574 trace!("pool dropped, dropping pooled ({:?})", self.key);
575 }
576 }
579 }
580}
581
582impl<T: Poolable, K: Key> fmt::Debug for Pooled<T, K> {
583 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
584 f.debug_struct("Pooled").field("key", &self.key).finish()
585 }
586}
587
588struct Idle<T> {
589 idle_at: Instant,
590 value: T,
591}
592
593#[allow(missing_debug_implementations)]
595pub struct Checkout<T, K: Key> {
596 key: K,
597 pool: Pool<T, K>,
598 waiter: Option<oneshot::Receiver<T>>,
599}
600
601#[derive(Debug)]
602#[non_exhaustive]
603pub enum Error {
604 PoolDisabled,
605 CheckoutNoLongerWanted,
606 CheckedOutClosedValue,
607}
608
609impl Error {
610 pub(super) fn is_canceled(&self) -> bool {
611 matches!(self, Error::CheckedOutClosedValue)
612 }
613}
614
615impl fmt::Display for Error {
616 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
617 f.write_str(match self {
618 Error::PoolDisabled => "pool is disabled",
619 Error::CheckedOutClosedValue => "checked out connection was closed",
620 Error::CheckoutNoLongerWanted => "request was canceled",
621 })
622 }
623}
624
625impl StdError for Error {}
626
627impl<T: Poolable, K: Key> Checkout<T, K> {
628 fn poll_waiter(
629 &mut self,
630 cx: &mut task::Context<'_>,
631 ) -> Poll<Option<Result<Pooled<T, K>, Error>>> {
632 if let Some(mut rx) = self.waiter.take() {
633 match Pin::new(&mut rx).poll(cx) {
634 Poll::Ready(Ok(value)) => {
635 if value.is_open() {
636 Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
637 } else {
638 Poll::Ready(Some(Err(Error::CheckedOutClosedValue)))
639 }
640 }
641 Poll::Pending => {
642 self.waiter = Some(rx);
643 Poll::Pending
644 }
645 Poll::Ready(Err(_canceled)) => {
646 Poll::Ready(Some(Err(Error::CheckoutNoLongerWanted)))
647 }
648 }
649 } else {
650 Poll::Ready(None)
651 }
652 }
653
654 fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T, K>> {
655 let entry = {
656 let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
657 let expiration = Expiration::new(inner.timeout);
658 let now = inner.now();
659 let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
660 trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
661 {
664 let popper = IdlePopper {
665 key: &self.key,
666 list,
667 };
668 popper.pop(&expiration, now)
669 }
670 .map(|e| (e, list.is_empty()))
671 });
672
673 let (entry, empty) = if let Some((e, empty)) = maybe_entry {
674 (Some(e), empty)
675 } else {
676 (None, true)
678 };
679 if empty {
680 inner.idle.remove(&self.key);
682 }
683
684 if entry.is_none() && self.waiter.is_none() {
685 let (tx, mut rx) = oneshot::channel();
686 trace!("checkout waiting for idle connection: {:?}", self.key);
687 inner
688 .waiters
689 .entry(self.key.clone())
690 .or_insert_with(VecDeque::new)
691 .push_back(tx);
692
693 assert!(Pin::new(&mut rx).poll(cx).is_pending());
695 self.waiter = Some(rx);
696 }
697
698 entry
699 };
700
701 entry.map(|e| self.pool.reuse(&self.key, e.value))
702 }
703}
704
705impl<T: Poolable, K: Key> Future for Checkout<T, K> {
706 type Output = Result<Pooled<T, K>, Error>;
707
708 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
709 if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
710 return Poll::Ready(Ok(pooled));
711 }
712
713 if let Some(pooled) = self.checkout(cx) {
714 Poll::Ready(Ok(pooled))
715 } else if !self.pool.is_enabled() {
716 Poll::Ready(Err(Error::PoolDisabled))
717 } else {
718 debug_assert!(self.waiter.is_some());
720 Poll::Pending
721 }
722 }
723}
724
725impl<T, K: Key> Drop for Checkout<T, K> {
726 fn drop(&mut self) {
727 if self.waiter.take().is_some() {
728 trace!("checkout dropped for {:?}", self.key);
729 if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
730 inner.clean_waiters(&self.key);
731 }
732 }
733 }
734}
735
736#[allow(missing_debug_implementations)]
738pub struct Connecting<T: Poolable, K: Key> {
739 key: K,
740 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
741}
742
743impl<T: Poolable, K: Key> Connecting<T, K> {
744 pub fn alpn_h2(self, pool: &Pool<T, K>) -> Option<Self> {
745 debug_assert!(
746 self.pool.0.is_none(),
747 "Connecting::alpn_h2 but already Http2"
748 );
749
750 pool.connecting(&self.key, Ver::Http2)
751 }
752}
753
754impl<T: Poolable, K: Key> Drop for Connecting<T, K> {
755 fn drop(&mut self) {
756 if let Some(pool) = self.pool.upgrade() {
757 if let Ok(mut inner) = pool.lock() {
759 inner.connected(&self.key);
760 }
761 }
762 }
763}
764
765struct Expiration(Option<Duration>);
766
767impl Expiration {
768 fn new(dur: Option<Duration>) -> Expiration {
769 Expiration(dur)
770 }
771
772 fn expires(&self, instant: Instant, now: Instant) -> bool {
773 match self.0 {
774 Some(timeout) => now.saturating_duration_since(instant) > timeout,
776 None => false,
777 }
778 }
779}
780
781struct IdleTask<T, K: Key> {
782 timer: Timer,
783 duration: Duration,
784 pool: WeakOpt<Mutex<PoolInner<T, K>>>,
785 pool_drop_notifier: oneshot::Receiver<Infallible>,
789}
790
791impl<T: Poolable + 'static, K: Key> IdleTask<T, K> {
792 async fn run(self) {
793 use futures_util::future;
794
795 let mut sleep = self.timer.sleep_until(self.timer.now() + self.duration);
796 let mut on_pool_drop = self.pool_drop_notifier;
797 loop {
798 match future::select(&mut on_pool_drop, &mut sleep).await {
799 future::Either::Left(_) => {
800 break;
802 }
803 future::Either::Right(((), _)) => {
804 if let Some(inner) = self.pool.upgrade() {
805 if let Ok(mut inner) = inner.lock() {
806 trace!("idle interval checking for expired");
807 inner.clear_expired();
808 }
809 }
810
811 let deadline = self.timer.now() + self.duration;
812 self.timer.reset(&mut sleep, deadline);
813 }
814 }
815 }
816
817 trace!("pool closed, canceling idle interval");
818 return;
819 }
820}
821
822impl<T> WeakOpt<T> {
823 fn none() -> Self {
824 WeakOpt(None)
825 }
826
827 fn downgrade(arc: &Arc<T>) -> Self {
828 WeakOpt(Some(Arc::downgrade(arc)))
829 }
830
831 fn upgrade(&self) -> Option<Arc<T>> {
832 self.0.as_ref().and_then(Weak::upgrade)
833 }
834}
835
836#[cfg(test)]
837mod tests {
838 use std::fmt::Debug;
839 use std::future::Future;
840 use std::hash::Hash;
841 use std::pin::Pin;
842 use std::task::{self, Poll};
843 use std::time::Duration;
844
845 use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
846 use crate::rt::{TokioExecutor, TokioTimer};
847
848 use crate::common::timer;
849
850 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
851 struct KeyImpl(http::uri::Scheme, http::uri::Authority);
852
853 type KeyTuple = (http::uri::Scheme, http::uri::Authority);
854
855 #[derive(Debug, PartialEq, Eq)]
857 struct Uniq<T>(T);
858
859 impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
860 fn is_open(&self) -> bool {
861 true
862 }
863
864 fn reserve(self) -> Reservation<Self> {
865 Reservation::Unique(self)
866 }
867
868 fn can_share(&self) -> bool {
869 false
870 }
871 }
872
873 fn c<T: Poolable, K: Key>(key: K) -> Connecting<T, K> {
874 Connecting {
875 key,
876 pool: WeakOpt::none(),
877 }
878 }
879
880 fn host_key(s: &str) -> KeyImpl {
881 KeyImpl(http::uri::Scheme::HTTP, s.parse().expect("host key"))
882 }
883
884 fn pool_no_timer<T, K: Key>() -> Pool<T, K> {
885 pool_max_idle_no_timer(usize::MAX)
886 }
887
888 fn pool_max_idle_no_timer<T, K: Key>(max_idle: usize) -> Pool<T, K> {
889 let pool = Pool::new(
890 super::Config {
891 idle_timeout: Some(Duration::from_millis(100)),
892 max_idle_per_host: max_idle,
893 },
894 TokioExecutor::new(),
895 Option::<timer::Timer>::None,
896 );
897 pool.no_timer();
898 pool
899 }
900
901 #[tokio::test]
902 async fn test_pool_checkout_smoke() {
903 let pool = pool_no_timer();
904 let key = host_key("foo");
905 let pooled = pool.pooled(c(key.clone()), Uniq(41));
906
907 drop(pooled);
908
909 match pool.checkout(key).await {
910 Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
911 Err(_) => panic!("not ready"),
912 };
913 }
914
915 struct PollOnce<'a, F>(&'a mut F);
917
918 impl<F, T, U> Future for PollOnce<'_, F>
919 where
920 F: Future<Output = Result<T, U>> + Unpin,
921 {
922 type Output = Option<()>;
923
924 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
925 match Pin::new(&mut self.0).poll(cx) {
926 Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
927 Poll::Ready(Err(_)) => Poll::Ready(Some(())),
928 Poll::Pending => Poll::Ready(None),
929 }
930 }
931 }
932
933 #[tokio::test]
934 async fn test_pool_checkout_returns_none_if_expired() {
935 let pool = pool_no_timer();
936 let key = host_key("foo");
937 let pooled = pool.pooled(c(key.clone()), Uniq(41));
938
939 drop(pooled);
940 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
941 let mut checkout = pool.checkout(key);
942 let poll_once = PollOnce(&mut checkout);
943 let is_not_ready = poll_once.await.is_none();
944 assert!(is_not_ready);
945 }
946
947 #[tokio::test]
948 async fn test_pool_checkout_removes_expired() {
949 let pool = pool_no_timer();
950 let key = host_key("foo");
951
952 pool.pooled(c(key.clone()), Uniq(41));
953 pool.pooled(c(key.clone()), Uniq(5));
954 pool.pooled(c(key.clone()), Uniq(99));
955
956 assert_eq!(
957 pool.locked().idle.get(&key).map(|entries| entries.len()),
958 Some(3)
959 );
960 tokio::time::sleep(pool.locked().timeout.unwrap()).await;
961
962 let mut checkout = pool.checkout(key.clone());
963 let poll_once = PollOnce(&mut checkout);
964 poll_once.await;
966 assert!(!pool.locked().idle.contains_key(&key));
967 }
968
969 #[test]
970 fn test_pool_max_idle_per_host() {
971 let pool = pool_max_idle_no_timer(2);
972 let key = host_key("foo");
973
974 pool.pooled(c(key.clone()), Uniq(41));
975 pool.pooled(c(key.clone()), Uniq(5));
976 pool.pooled(c(key.clone()), Uniq(99));
977
978 assert_eq!(
980 pool.locked().idle.get(&key).map(|entries| entries.len()),
981 Some(2)
982 );
983 }
984
985 #[tokio::test]
986 async fn test_pool_timer_removes_expired_realtime() {
987 test_pool_timer_removes_expired_inner().await
988 }
989
990 #[tokio::test(start_paused = true)]
991 async fn test_pool_timer_removes_expired_faketime() {
992 test_pool_timer_removes_expired_inner().await
993 }
994
995 async fn test_pool_timer_removes_expired_inner() {
996 let pool = Pool::new(
997 super::Config {
998 idle_timeout: Some(Duration::from_millis(10)),
999 max_idle_per_host: usize::MAX,
1000 },
1001 TokioExecutor::new(),
1002 Some(TokioTimer::new()),
1003 );
1004
1005 let key = host_key("foo");
1006
1007 pool.pooled(c(key.clone()), Uniq(41));
1008 pool.pooled(c(key.clone()), Uniq(5));
1009 pool.pooled(c(key.clone()), Uniq(99));
1010
1011 assert_eq!(
1012 pool.locked().idle.get(&key).map(|entries| entries.len()),
1013 Some(3)
1014 );
1015
1016 tokio::time::sleep(Duration::from_millis(30)).await;
1018
1019 assert_eq!(
1021 pool.locked().idle.get(&key).map(|entries| entries.len()),
1022 Some(3)
1023 );
1024
1025 tokio::time::sleep(Duration::from_millis(70)).await;
1027 tokio::task::yield_now().await;
1029
1030 assert!(!pool.locked().idle.contains_key(&key));
1031 }
1032
1033 #[tokio::test]
1034 async fn test_pool_checkout_task_unparked() {
1035 use futures_util::future::join;
1036 use futures_util::FutureExt;
1037
1038 let pool = pool_no_timer();
1039 let key = host_key("foo");
1040 let pooled = pool.pooled(c(key.clone()), Uniq(41));
1041
1042 let checkout = join(pool.checkout(key), async {
1043 drop(pooled);
1049 })
1050 .map(|(entry, _)| entry);
1051
1052 assert_eq!(*checkout.await.unwrap(), Uniq(41));
1053 }
1054
1055 #[tokio::test]
1056 async fn test_pool_checkout_drop_cleans_up_waiters() {
1057 let pool = pool_no_timer::<Uniq<i32>, KeyImpl>();
1058 let key = host_key("foo");
1059
1060 let mut checkout1 = pool.checkout(key.clone());
1061 let mut checkout2 = pool.checkout(key.clone());
1062
1063 let poll_once1 = PollOnce(&mut checkout1);
1064 let poll_once2 = PollOnce(&mut checkout2);
1065
1066 poll_once1.await;
1068 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1069 poll_once2.await;
1070 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
1071
1072 drop(checkout1);
1074 assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1075
1076 drop(checkout2);
1077 assert!(!pool.locked().waiters.contains_key(&key));
1078 }
1079
1080 #[derive(Debug)]
1081 struct CanClose {
1082 #[allow(unused)]
1083 val: i32,
1084 closed: bool,
1085 }
1086
1087 impl Poolable for CanClose {
1088 fn is_open(&self) -> bool {
1089 !self.closed
1090 }
1091
1092 fn reserve(self) -> Reservation<Self> {
1093 Reservation::Unique(self)
1094 }
1095
1096 fn can_share(&self) -> bool {
1097 false
1098 }
1099 }
1100
1101 #[test]
1102 fn pooled_drop_if_closed_doesnt_reinsert() {
1103 let pool = pool_no_timer();
1104 let key = host_key("foo");
1105 pool.pooled(
1106 c(key.clone()),
1107 CanClose {
1108 val: 57,
1109 closed: true,
1110 },
1111 );
1112
1113 assert!(!pool.locked().idle.contains_key(&key));
1114 }
1115}