deadpool/managed/
mod.rs

1//! Managed version of the pool.
2//!
3//! "Managed" means that it requires a [`Manager`] which is responsible for
4//! creating and recycling objects as they are needed.
5//!
6//! # Example
7//!
8//! ```rust
9//! use deadpool::managed;
10//!
11//! #[derive(Debug)]
12//! enum Error { Fail }
13//!
14//! struct Computer {}
15//!
16//! impl Computer {
17//!     async fn get_answer(&self) -> i32 {
18//!         42
19//!     }
20//! }
21//!
22//! struct Manager {}
23//!
24//! impl managed::Manager for Manager {
25//!     type Type = Computer;
26//!     type Error = Error;
27//!
28//!     async fn create(&self) -> Result<Computer, Error> {
29//!         Ok(Computer {})
30//!     }
31//!     async fn recycle(&self, conn: &mut Computer, _: &managed::Metrics) -> managed::RecycleResult<Error> {
32//!         Ok(())
33//!     }
34//! }
35//!
36//! type Pool = managed::Pool<Manager>;
37//!
38//! #[tokio::main]
39//! async fn main() {
40//!     let mgr = Manager {};
41//!     let pool = Pool::builder(mgr).max_size(16).build().unwrap();
42//!     let mut conn = pool.get().await.unwrap();
43//!     let answer = conn.get_answer().await;
44//!     assert_eq!(answer, 42);
45//! }
46//! ```
47//!
48//! For a more complete example please see
49//! [`deadpool-postgres`](https://crates.io/crates/deadpool-postgres) crate.
50
51mod builder;
52mod config;
53mod dropguard;
54mod errors;
55mod hooks;
56mod metrics;
57pub mod reexports;
58
59use std::{
60    collections::VecDeque,
61    fmt,
62    future::Future,
63    marker::PhantomData,
64    ops::{Deref, DerefMut},
65    sync::{
66        atomic::{AtomicUsize, Ordering},
67        Arc, Mutex, Weak,
68    },
69    time::Duration,
70};
71
72#[cfg(not(target_arch = "wasm32"))]
73use std::time::Instant;
74
75use deadpool_runtime::Runtime;
76use tokio::sync::{Semaphore, TryAcquireError};
77
78pub use crate::Status;
79
80use self::dropguard::DropGuard;
81pub use self::{
82    builder::{BuildError, PoolBuilder},
83    config::{CreatePoolError, PoolConfig, QueueMode, Timeouts},
84    errors::{PoolError, RecycleError, TimeoutType},
85    hooks::{Hook, HookError, HookFuture, HookResult},
86    metrics::Metrics,
87};
88
89/// Result type of the [`Manager::recycle()`] method.
90pub type RecycleResult<E> = Result<(), RecycleError<E>>;
91
92/// Manager responsible for creating new [`Object`]s or recycling existing ones.
93pub trait Manager: Sync + Send {
94    /// Type of [`Object`]s that this [`Manager`] creates and recycles.
95    type Type: Send;
96    /// Error that this [`Manager`] can return when creating and/or recycling
97    /// [`Object`]s.
98    type Error: Send;
99
100    /// Creates a new instance of [`Manager::Type`].
101    fn create(&self) -> impl Future<Output = Result<Self::Type, Self::Error>> + Send;
102
103    /// Tries to recycle an instance of [`Manager::Type`].
104    ///
105    /// # Errors
106    ///
107    /// Returns [`Manager::Error`] if the instance couldn't be recycled.
108    fn recycle(
109        &self,
110        obj: &mut Self::Type,
111        metrics: &Metrics,
112    ) -> impl Future<Output = RecycleResult<Self::Error>> + Send;
113
114    /// Detaches an instance of [`Manager::Type`] from this [`Manager`].
115    ///
116    /// This method is called when using the [`Object::take()`] method for
117    /// removing an [`Object`] from a [`Pool`]. If the [`Manager`] doesn't hold
118    /// any references to the handed out [`Object`]s then the default
119    /// implementation can be used which does nothing.
120    fn detach(&self, _obj: &mut Self::Type) {}
121}
122
123/// Wrapper around the actual pooled object which implements [`Deref`],
124/// [`DerefMut`] and [`Drop`] traits.
125///
126/// Use this object just as if it was of type `T` and upon leaving a scope the
127/// [`Drop::drop()`] will take care of returning it to the pool.
128#[must_use]
129pub struct Object<M: Manager> {
130    /// The actual object
131    inner: Option<ObjectInner<M>>,
132
133    /// Pool to return the pooled object to.
134    pool: Weak<PoolInner<M>>,
135}
136
137impl<M> fmt::Debug for Object<M>
138where
139    M: fmt::Debug + Manager,
140    M::Type: fmt::Debug,
141{
142    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143        f.debug_struct("Object")
144            .field("inner", &self.inner)
145            .finish()
146    }
147}
148
149struct UnreadyObject<'a, M: Manager> {
150    inner: Option<ObjectInner<M>>,
151    pool: &'a PoolInner<M>,
152}
153
154impl<M: Manager> UnreadyObject<'_, M> {
155    fn ready(mut self) -> ObjectInner<M> {
156        self.inner.take().unwrap()
157    }
158    fn inner(&mut self) -> &mut ObjectInner<M> {
159        self.inner.as_mut().unwrap()
160    }
161}
162
163impl<M: Manager> Drop for UnreadyObject<'_, M> {
164    fn drop(&mut self) {
165        if let Some(mut inner) = self.inner.take() {
166            self.pool.slots.lock().unwrap().size -= 1;
167            self.pool.manager.detach(&mut inner.obj);
168        }
169    }
170}
171
172#[derive(Debug)]
173pub(crate) struct ObjectInner<M: Manager> {
174    /// Actual pooled object.
175    obj: M::Type,
176
177    /// Object metrics.
178    metrics: Metrics,
179}
180
181impl<M: Manager> Object<M> {
182    /// Takes this [`Object`] from its [`Pool`] permanently. This reduces the
183    /// size of the [`Pool`].
184    #[must_use]
185    pub fn take(mut this: Self) -> M::Type {
186        let mut inner = this.inner.take().unwrap().obj;
187        if let Some(pool) = Object::pool(&this) {
188            pool.inner.detach_object(&mut inner)
189        }
190        inner
191    }
192
193    /// Get object statistics
194    pub fn metrics(this: &Self) -> &Metrics {
195        &this.inner.as_ref().unwrap().metrics
196    }
197
198    /// Returns the [`Pool`] this [`Object`] belongs to.
199    ///
200    /// Since [`Object`]s only hold a [`Weak`] reference to the [`Pool`] they
201    /// come from, this can fail and return [`None`] instead.
202    pub fn pool(this: &Self) -> Option<Pool<M>> {
203        this.pool.upgrade().map(|inner| Pool {
204            inner,
205            _wrapper: PhantomData,
206        })
207    }
208}
209
210impl<M: Manager> Drop for Object<M> {
211    fn drop(&mut self) {
212        if let Some(inner) = self.inner.take() {
213            if let Some(pool) = self.pool.upgrade() {
214                pool.return_object(inner)
215            }
216        }
217    }
218}
219
220impl<M: Manager> Deref for Object<M> {
221    type Target = M::Type;
222    fn deref(&self) -> &M::Type {
223        &self.inner.as_ref().unwrap().obj
224    }
225}
226
227impl<M: Manager> DerefMut for Object<M> {
228    fn deref_mut(&mut self) -> &mut Self::Target {
229        &mut self.inner.as_mut().unwrap().obj
230    }
231}
232
233impl<M: Manager> AsRef<M::Type> for Object<M> {
234    fn as_ref(&self) -> &M::Type {
235        self
236    }
237}
238
239impl<M: Manager> AsMut<M::Type> for Object<M> {
240    fn as_mut(&mut self) -> &mut M::Type {
241        self
242    }
243}
244
245/// Generic object and connection pool.
246///
247/// This struct can be cloned and transferred across thread boundaries and uses
248/// reference counting for its internal state.
249pub struct Pool<M: Manager, W: From<Object<M>> = Object<M>> {
250    inner: Arc<PoolInner<M>>,
251    _wrapper: PhantomData<fn() -> W>,
252}
253
254// Implemented manually to avoid unnecessary trait bound on `W` type parameter.
255impl<M, W> fmt::Debug for Pool<M, W>
256where
257    M: fmt::Debug + Manager,
258    M::Type: fmt::Debug,
259    W: From<Object<M>>,
260{
261    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262        f.debug_struct("Pool")
263            .field("inner", &self.inner)
264            .field("wrapper", &self._wrapper)
265            .finish()
266    }
267}
268
269impl<M: Manager, W: From<Object<M>>> Clone for Pool<M, W> {
270    fn clone(&self) -> Self {
271        Self {
272            inner: self.inner.clone(),
273            _wrapper: PhantomData,
274        }
275    }
276}
277
278impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
279    /// Instantiates a builder for a new [`Pool`].
280    ///
281    /// This is the only way to create a [`Pool`] instance.
282    pub fn builder(manager: M) -> PoolBuilder<M, W> {
283        PoolBuilder::new(manager)
284    }
285
286    pub(crate) fn from_builder(builder: PoolBuilder<M, W>) -> Self {
287        Self {
288            inner: Arc::new(PoolInner {
289                manager: builder.manager,
290                slots: Mutex::new(Slots {
291                    vec: VecDeque::with_capacity(builder.config.max_size),
292                    size: 0,
293                    max_size: builder.config.max_size,
294                }),
295                users: AtomicUsize::new(0),
296                semaphore: Semaphore::new(builder.config.max_size),
297                config: builder.config,
298                hooks: builder.hooks,
299                runtime: builder.runtime,
300            }),
301            _wrapper: PhantomData,
302        }
303    }
304
305    /// Retrieves an [`Object`] from this [`Pool`] or waits for one to
306    /// become available.
307    ///
308    /// # Errors
309    ///
310    /// See [`PoolError`] for details.
311    pub async fn get(&self) -> Result<W, PoolError<M::Error>> {
312        self.timeout_get(&self.timeouts()).await
313    }
314
315    /// Retrieves an [`Object`] from this [`Pool`] using a different `timeout`
316    /// than the configured one.
317    ///
318    /// # Errors
319    ///
320    /// See [`PoolError`] for details.
321    pub async fn timeout_get(&self, timeouts: &Timeouts) -> Result<W, PoolError<M::Error>> {
322        let _ = self.inner.users.fetch_add(1, Ordering::Relaxed);
323        let users_guard = DropGuard(|| {
324            let _ = self.inner.users.fetch_sub(1, Ordering::Relaxed);
325        });
326
327        let non_blocking = match timeouts.wait {
328            Some(t) => t.as_nanos() == 0,
329            None => false,
330        };
331
332        let permit = if non_blocking {
333            self.inner.semaphore.try_acquire().map_err(|e| match e {
334                TryAcquireError::Closed => PoolError::Closed,
335                TryAcquireError::NoPermits => PoolError::Timeout(TimeoutType::Wait),
336            })?
337        } else {
338            apply_timeout(
339                self.inner.runtime,
340                TimeoutType::Wait,
341                timeouts.wait,
342                async {
343                    self.inner
344                        .semaphore
345                        .acquire()
346                        .await
347                        .map_err(|_| PoolError::Closed)
348                },
349            )
350            .await?
351        };
352
353        let inner_obj = loop {
354            let inner_obj = match self.inner.config.queue_mode {
355                QueueMode::Fifo => self.inner.slots.lock().unwrap().vec.pop_front(),
356                QueueMode::Lifo => self.inner.slots.lock().unwrap().vec.pop_back(),
357            };
358            let inner_obj = if let Some(inner_obj) = inner_obj {
359                self.try_recycle(timeouts, inner_obj).await?
360            } else {
361                self.try_create(timeouts).await?
362            };
363            if let Some(inner_obj) = inner_obj {
364                break inner_obj;
365            }
366        };
367
368        users_guard.disarm();
369        permit.forget();
370
371        Ok(Object {
372            inner: Some(inner_obj),
373            pool: Arc::downgrade(&self.inner),
374        }
375        .into())
376    }
377
378    #[inline]
379    async fn try_recycle(
380        &self,
381        timeouts: &Timeouts,
382        inner_obj: ObjectInner<M>,
383    ) -> Result<Option<ObjectInner<M>>, PoolError<M::Error>> {
384        let mut unready_obj = UnreadyObject {
385            inner: Some(inner_obj),
386            pool: &self.inner,
387        };
388        let inner = unready_obj.inner();
389
390        // Apply pre_recycle hooks
391        if let Err(_e) = self.inner.hooks.pre_recycle.apply(inner).await {
392            // TODO log pre_recycle error
393            return Ok(None);
394        }
395
396        if apply_timeout(
397            self.inner.runtime,
398            TimeoutType::Recycle,
399            timeouts.recycle,
400            self.inner.manager.recycle(&mut inner.obj, &inner.metrics),
401        )
402        .await
403        .is_err()
404        {
405            return Ok(None);
406        }
407
408        // Apply post_recycle hooks
409        if let Err(_e) = self.inner.hooks.post_recycle.apply(inner).await {
410            // TODO log post_recycle error
411            return Ok(None);
412        }
413
414        inner.metrics.recycle_count += 1;
415        #[cfg(not(target_arch = "wasm32"))]
416        {
417            inner.metrics.recycled = Some(Instant::now());
418        }
419
420        Ok(Some(unready_obj.ready()))
421    }
422
423    #[inline]
424    async fn try_create(
425        &self,
426        timeouts: &Timeouts,
427    ) -> Result<Option<ObjectInner<M>>, PoolError<M::Error>> {
428        let mut unready_obj = UnreadyObject {
429            inner: Some(ObjectInner {
430                obj: apply_timeout(
431                    self.inner.runtime,
432                    TimeoutType::Create,
433                    timeouts.create,
434                    self.inner.manager.create(),
435                )
436                .await?,
437                metrics: Metrics::default(),
438            }),
439            pool: &self.inner,
440        };
441
442        self.inner.slots.lock().unwrap().size += 1;
443
444        // Apply post_create hooks
445        if let Err(e) = self
446            .inner
447            .hooks
448            .post_create
449            .apply(unready_obj.inner())
450            .await
451        {
452            return Err(PoolError::PostCreateHook(e));
453        }
454
455        Ok(Some(unready_obj.ready()))
456    }
457
458    /**
459     * Resize the pool. This change the `max_size` of the pool dropping
460     * excess objects and/or making space for new ones.
461     *
462     * If the pool is closed this method does nothing. The [`Pool::status`] method
463     * always reports a `max_size` of 0 for closed pools.
464     */
465    pub fn resize(&self, max_size: usize) {
466        if self.inner.semaphore.is_closed() {
467            return;
468        }
469        let mut slots = self.inner.slots.lock().unwrap();
470        let old_max_size = slots.max_size;
471        slots.max_size = max_size;
472        // shrink pool
473        if max_size < old_max_size {
474            while slots.size > slots.max_size {
475                if let Ok(permit) = self.inner.semaphore.try_acquire() {
476                    permit.forget();
477                    if slots.vec.pop_front().is_some() {
478                        slots.size -= 1;
479                    }
480                } else {
481                    break;
482                }
483            }
484            // Create a new VecDeque with a smaller capacity
485            let mut vec = VecDeque::with_capacity(max_size);
486            for obj in slots.vec.drain(..) {
487                vec.push_back(obj);
488            }
489            slots.vec = vec;
490        }
491        // grow pool
492        if max_size > old_max_size {
493            let additional = slots.max_size - old_max_size;
494            slots.vec.reserve_exact(additional);
495            self.inner.semaphore.add_permits(additional);
496        }
497    }
498
499    /// Retains only the objects specified by the given function.
500    ///
501    /// This function is typically used to remove objects from
502    /// the pool based on their current state or metrics.
503    ///
504    /// **Caution:** This function blocks the entire pool while
505    /// it is running. Therefore the given function should not
506    /// block.
507    ///
508    /// The following example starts a background task that
509    /// runs every 30 seconds and removes objects from the pool
510    /// that haven't been used for more than one minute.
511    ///
512    /// ```rust,ignore
513    /// let interval = Duration::from_secs(30);
514    /// let max_age = Duration::from_secs(60);
515    /// tokio::spawn(async move {
516    ///     loop {
517    ///         tokio::time::sleep(interval).await;
518    ///         pool.retain(|_, metrics| metrics.last_used() < max_age);
519    ///     }
520    /// });
521    /// ```
522    pub fn retain(
523        &self,
524        mut predicate: impl FnMut(&M::Type, Metrics) -> bool,
525    ) -> RetainResult<M::Type> {
526        let mut removed = Vec::with_capacity(self.status().size);
527        let mut guard = self.inner.slots.lock().unwrap();
528        let mut i = 0;
529        // This code can be simplified once `Vec::extract_if` lands in stable Rust.
530        // https://doc.rust-lang.org/std/vec/struct.Vec.html#method.extract_if
531        while i < guard.vec.len() {
532            let obj = &mut guard.vec[i];
533            if predicate(&mut obj.obj, obj.metrics) {
534                i += 1;
535            } else {
536                let mut obj = guard.vec.remove(i).unwrap();
537                self.manager().detach(&mut obj.obj);
538                removed.push(obj.obj);
539            }
540        }
541        guard.size -= removed.len();
542        RetainResult {
543            retained: i,
544            removed,
545        }
546    }
547
548    /// Get current timeout configuration
549    pub fn timeouts(&self) -> Timeouts {
550        self.inner.config.timeouts
551    }
552
553    /// Closes this [`Pool`].
554    ///
555    /// All current and future tasks waiting for [`Object`]s will return
556    /// [`PoolError::Closed`] immediately.
557    ///
558    /// This operation resizes the pool to 0.
559    pub fn close(&self) {
560        self.resize(0);
561        self.inner.semaphore.close();
562    }
563
564    /// Indicates whether this [`Pool`] has been closed.
565    pub fn is_closed(&self) -> bool {
566        self.inner.semaphore.is_closed()
567    }
568
569    /// Retrieves [`Status`] of this [`Pool`].
570    #[must_use]
571    pub fn status(&self) -> Status {
572        let slots = self.inner.slots.lock().unwrap();
573        let users = self.inner.users.load(Ordering::Relaxed);
574        let (available, waiting) = if users < slots.size {
575            (slots.size - users, 0)
576        } else {
577            (0, users - slots.size)
578        };
579        Status {
580            max_size: slots.max_size,
581            size: slots.size,
582            available,
583            waiting,
584        }
585    }
586
587    /// Returns [`Manager`] of this [`Pool`].
588    #[must_use]
589    pub fn manager(&self) -> &M {
590        &self.inner.manager
591    }
592}
593
594struct PoolInner<M: Manager> {
595    manager: M,
596    slots: Mutex<Slots<ObjectInner<M>>>,
597    /// Number of available [`Object`]s in the [`Pool`]. If there are no
598    /// [`Object`]s in the [`Pool`] this number can become negative and store
599    /// the number of [`Future`]s waiting for an [`Object`].
600    users: AtomicUsize,
601    semaphore: Semaphore,
602    config: PoolConfig,
603    runtime: Option<Runtime>,
604    hooks: hooks::Hooks<M>,
605}
606
607#[derive(Debug)]
608struct Slots<T> {
609    vec: VecDeque<T>,
610    size: usize,
611    max_size: usize,
612}
613
614// Implemented manually to avoid unnecessary trait bound on the struct.
615impl<M> fmt::Debug for PoolInner<M>
616where
617    M: fmt::Debug + Manager,
618    M::Type: fmt::Debug,
619{
620    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
621        f.debug_struct("PoolInner")
622            .field("manager", &self.manager)
623            .field("slots", &self.slots)
624            .field("used", &self.users)
625            .field("semaphore", &self.semaphore)
626            .field("config", &self.config)
627            .field("runtime", &self.runtime)
628            .field("hooks", &self.hooks)
629            .finish()
630    }
631}
632
633impl<M: Manager> PoolInner<M> {
634    fn return_object(&self, mut inner: ObjectInner<M>) {
635        let _ = self.users.fetch_sub(1, Ordering::Relaxed);
636        let mut slots = self.slots.lock().unwrap();
637        if slots.size <= slots.max_size {
638            slots.vec.push_back(inner);
639            drop(slots);
640            self.semaphore.add_permits(1);
641        } else {
642            slots.size -= 1;
643            drop(slots);
644            self.manager.detach(&mut inner.obj);
645        }
646    }
647    fn detach_object(&self, obj: &mut M::Type) {
648        let _ = self.users.fetch_sub(1, Ordering::Relaxed);
649        let mut slots = self.slots.lock().unwrap();
650        let add_permits = slots.size <= slots.max_size;
651        slots.size -= 1;
652        drop(slots);
653        if add_permits {
654            self.semaphore.add_permits(1);
655        }
656        self.manager.detach(obj);
657    }
658}
659
660async fn apply_timeout<O, E>(
661    runtime: Option<Runtime>,
662    timeout_type: TimeoutType,
663    duration: Option<Duration>,
664    future: impl Future<Output = Result<O, impl Into<PoolError<E>>>>,
665) -> Result<O, PoolError<E>> {
666    match (runtime, duration) {
667        (_, None) => future.await.map_err(Into::into),
668        (Some(runtime), Some(duration)) => runtime
669            .timeout(duration, future)
670            .await
671            .ok_or(PoolError::Timeout(timeout_type))?
672            .map_err(Into::into),
673        (None, Some(_)) => Err(PoolError::NoRuntimeSpecified),
674    }
675}
676
677#[derive(Debug)]
678/// This is the result returned by `Pool::retain`
679pub struct RetainResult<T> {
680    /// Number of retained objects
681    pub retained: usize,
682    /// Objects that were removed from the pool
683    pub removed: Vec<T>,
684}
685
686impl<T> Default for RetainResult<T> {
687    fn default() -> Self {
688        Self {
689            retained: Default::default(),
690            removed: Default::default(),
691        }
692    }
693}