1mod builder;
52mod config;
53mod dropguard;
54mod errors;
55mod hooks;
56mod metrics;
57pub mod reexports;
58
59use std::{
60 collections::VecDeque,
61 fmt,
62 future::Future,
63 marker::PhantomData,
64 ops::{Deref, DerefMut},
65 sync::{
66 atomic::{AtomicUsize, Ordering},
67 Arc, Mutex, Weak,
68 },
69 time::Duration,
70};
71
72#[cfg(not(target_arch = "wasm32"))]
73use std::time::Instant;
74
75use deadpool_runtime::Runtime;
76use tokio::sync::{Semaphore, TryAcquireError};
77
78pub use crate::Status;
79
80use self::dropguard::DropGuard;
81pub use self::{
82 builder::{BuildError, PoolBuilder},
83 config::{CreatePoolError, PoolConfig, QueueMode, Timeouts},
84 errors::{PoolError, RecycleError, TimeoutType},
85 hooks::{Hook, HookError, HookFuture, HookResult},
86 metrics::Metrics,
87};
88
89pub type RecycleResult<E> = Result<(), RecycleError<E>>;
91
92pub trait Manager: Sync + Send {
94 type Type: Send;
96 type Error: Send;
99
100 fn create(&self) -> impl Future<Output = Result<Self::Type, Self::Error>> + Send;
102
103 fn recycle(
109 &self,
110 obj: &mut Self::Type,
111 metrics: &Metrics,
112 ) -> impl Future<Output = RecycleResult<Self::Error>> + Send;
113
114 fn detach(&self, _obj: &mut Self::Type) {}
121}
122
123#[must_use]
129pub struct Object<M: Manager> {
130 inner: Option<ObjectInner<M>>,
132
133 pool: Weak<PoolInner<M>>,
135}
136
137impl<M> fmt::Debug for Object<M>
138where
139 M: fmt::Debug + Manager,
140 M::Type: fmt::Debug,
141{
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 f.debug_struct("Object")
144 .field("inner", &self.inner)
145 .finish()
146 }
147}
148
149struct UnreadyObject<'a, M: Manager> {
150 inner: Option<ObjectInner<M>>,
151 pool: &'a PoolInner<M>,
152}
153
154impl<M: Manager> UnreadyObject<'_, M> {
155 fn ready(mut self) -> ObjectInner<M> {
156 self.inner.take().unwrap()
157 }
158 fn inner(&mut self) -> &mut ObjectInner<M> {
159 self.inner.as_mut().unwrap()
160 }
161}
162
163impl<M: Manager> Drop for UnreadyObject<'_, M> {
164 fn drop(&mut self) {
165 if let Some(mut inner) = self.inner.take() {
166 self.pool.slots.lock().unwrap().size -= 1;
167 self.pool.manager.detach(&mut inner.obj);
168 }
169 }
170}
171
172#[derive(Debug)]
173pub(crate) struct ObjectInner<M: Manager> {
174 obj: M::Type,
176
177 metrics: Metrics,
179}
180
181impl<M: Manager> Object<M> {
182 #[must_use]
185 pub fn take(mut this: Self) -> M::Type {
186 let mut inner = this.inner.take().unwrap().obj;
187 if let Some(pool) = Object::pool(&this) {
188 pool.inner.detach_object(&mut inner)
189 }
190 inner
191 }
192
193 pub fn metrics(this: &Self) -> &Metrics {
195 &this.inner.as_ref().unwrap().metrics
196 }
197
198 pub fn pool(this: &Self) -> Option<Pool<M>> {
203 this.pool.upgrade().map(|inner| Pool {
204 inner,
205 _wrapper: PhantomData,
206 })
207 }
208}
209
210impl<M: Manager> Drop for Object<M> {
211 fn drop(&mut self) {
212 if let Some(inner) = self.inner.take() {
213 if let Some(pool) = self.pool.upgrade() {
214 pool.return_object(inner)
215 }
216 }
217 }
218}
219
220impl<M: Manager> Deref for Object<M> {
221 type Target = M::Type;
222 fn deref(&self) -> &M::Type {
223 &self.inner.as_ref().unwrap().obj
224 }
225}
226
227impl<M: Manager> DerefMut for Object<M> {
228 fn deref_mut(&mut self) -> &mut Self::Target {
229 &mut self.inner.as_mut().unwrap().obj
230 }
231}
232
233impl<M: Manager> AsRef<M::Type> for Object<M> {
234 fn as_ref(&self) -> &M::Type {
235 self
236 }
237}
238
239impl<M: Manager> AsMut<M::Type> for Object<M> {
240 fn as_mut(&mut self) -> &mut M::Type {
241 self
242 }
243}
244
245pub struct Pool<M: Manager, W: From<Object<M>> = Object<M>> {
250 inner: Arc<PoolInner<M>>,
251 _wrapper: PhantomData<fn() -> W>,
252}
253
254impl<M, W> fmt::Debug for Pool<M, W>
256where
257 M: fmt::Debug + Manager,
258 M::Type: fmt::Debug,
259 W: From<Object<M>>,
260{
261 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262 f.debug_struct("Pool")
263 .field("inner", &self.inner)
264 .field("wrapper", &self._wrapper)
265 .finish()
266 }
267}
268
269impl<M: Manager, W: From<Object<M>>> Clone for Pool<M, W> {
270 fn clone(&self) -> Self {
271 Self {
272 inner: self.inner.clone(),
273 _wrapper: PhantomData,
274 }
275 }
276}
277
278impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
279 pub fn builder(manager: M) -> PoolBuilder<M, W> {
283 PoolBuilder::new(manager)
284 }
285
286 pub(crate) fn from_builder(builder: PoolBuilder<M, W>) -> Self {
287 Self {
288 inner: Arc::new(PoolInner {
289 manager: builder.manager,
290 slots: Mutex::new(Slots {
291 vec: VecDeque::with_capacity(builder.config.max_size),
292 size: 0,
293 max_size: builder.config.max_size,
294 }),
295 users: AtomicUsize::new(0),
296 semaphore: Semaphore::new(builder.config.max_size),
297 config: builder.config,
298 hooks: builder.hooks,
299 runtime: builder.runtime,
300 }),
301 _wrapper: PhantomData,
302 }
303 }
304
305 pub async fn get(&self) -> Result<W, PoolError<M::Error>> {
312 self.timeout_get(&self.timeouts()).await
313 }
314
315 pub async fn timeout_get(&self, timeouts: &Timeouts) -> Result<W, PoolError<M::Error>> {
322 let _ = self.inner.users.fetch_add(1, Ordering::Relaxed);
323 let users_guard = DropGuard(|| {
324 let _ = self.inner.users.fetch_sub(1, Ordering::Relaxed);
325 });
326
327 let non_blocking = match timeouts.wait {
328 Some(t) => t.as_nanos() == 0,
329 None => false,
330 };
331
332 let permit = if non_blocking {
333 self.inner.semaphore.try_acquire().map_err(|e| match e {
334 TryAcquireError::Closed => PoolError::Closed,
335 TryAcquireError::NoPermits => PoolError::Timeout(TimeoutType::Wait),
336 })?
337 } else {
338 apply_timeout(
339 self.inner.runtime,
340 TimeoutType::Wait,
341 timeouts.wait,
342 async {
343 self.inner
344 .semaphore
345 .acquire()
346 .await
347 .map_err(|_| PoolError::Closed)
348 },
349 )
350 .await?
351 };
352
353 let inner_obj = loop {
354 let inner_obj = match self.inner.config.queue_mode {
355 QueueMode::Fifo => self.inner.slots.lock().unwrap().vec.pop_front(),
356 QueueMode::Lifo => self.inner.slots.lock().unwrap().vec.pop_back(),
357 };
358 let inner_obj = if let Some(inner_obj) = inner_obj {
359 self.try_recycle(timeouts, inner_obj).await?
360 } else {
361 self.try_create(timeouts).await?
362 };
363 if let Some(inner_obj) = inner_obj {
364 break inner_obj;
365 }
366 };
367
368 users_guard.disarm();
369 permit.forget();
370
371 Ok(Object {
372 inner: Some(inner_obj),
373 pool: Arc::downgrade(&self.inner),
374 }
375 .into())
376 }
377
378 #[inline]
379 async fn try_recycle(
380 &self,
381 timeouts: &Timeouts,
382 inner_obj: ObjectInner<M>,
383 ) -> Result<Option<ObjectInner<M>>, PoolError<M::Error>> {
384 let mut unready_obj = UnreadyObject {
385 inner: Some(inner_obj),
386 pool: &self.inner,
387 };
388 let inner = unready_obj.inner();
389
390 if let Err(_e) = self.inner.hooks.pre_recycle.apply(inner).await {
392 return Ok(None);
394 }
395
396 if apply_timeout(
397 self.inner.runtime,
398 TimeoutType::Recycle,
399 timeouts.recycle,
400 self.inner.manager.recycle(&mut inner.obj, &inner.metrics),
401 )
402 .await
403 .is_err()
404 {
405 return Ok(None);
406 }
407
408 if let Err(_e) = self.inner.hooks.post_recycle.apply(inner).await {
410 return Ok(None);
412 }
413
414 inner.metrics.recycle_count += 1;
415 #[cfg(not(target_arch = "wasm32"))]
416 {
417 inner.metrics.recycled = Some(Instant::now());
418 }
419
420 Ok(Some(unready_obj.ready()))
421 }
422
423 #[inline]
424 async fn try_create(
425 &self,
426 timeouts: &Timeouts,
427 ) -> Result<Option<ObjectInner<M>>, PoolError<M::Error>> {
428 let mut unready_obj = UnreadyObject {
429 inner: Some(ObjectInner {
430 obj: apply_timeout(
431 self.inner.runtime,
432 TimeoutType::Create,
433 timeouts.create,
434 self.inner.manager.create(),
435 )
436 .await?,
437 metrics: Metrics::default(),
438 }),
439 pool: &self.inner,
440 };
441
442 self.inner.slots.lock().unwrap().size += 1;
443
444 if let Err(e) = self
446 .inner
447 .hooks
448 .post_create
449 .apply(unready_obj.inner())
450 .await
451 {
452 return Err(PoolError::PostCreateHook(e));
453 }
454
455 Ok(Some(unready_obj.ready()))
456 }
457
458 pub fn resize(&self, max_size: usize) {
466 if self.inner.semaphore.is_closed() {
467 return;
468 }
469 let mut slots = self.inner.slots.lock().unwrap();
470 let old_max_size = slots.max_size;
471 slots.max_size = max_size;
472 if max_size < old_max_size {
474 while slots.size > slots.max_size {
475 if let Ok(permit) = self.inner.semaphore.try_acquire() {
476 permit.forget();
477 if slots.vec.pop_front().is_some() {
478 slots.size -= 1;
479 }
480 } else {
481 break;
482 }
483 }
484 let mut vec = VecDeque::with_capacity(max_size);
486 for obj in slots.vec.drain(..) {
487 vec.push_back(obj);
488 }
489 slots.vec = vec;
490 }
491 if max_size > old_max_size {
493 let additional = slots.max_size - old_max_size;
494 slots.vec.reserve_exact(additional);
495 self.inner.semaphore.add_permits(additional);
496 }
497 }
498
499 pub fn retain(
523 &self,
524 mut predicate: impl FnMut(&M::Type, Metrics) -> bool,
525 ) -> RetainResult<M::Type> {
526 let mut removed = Vec::with_capacity(self.status().size);
527 let mut guard = self.inner.slots.lock().unwrap();
528 let mut i = 0;
529 while i < guard.vec.len() {
532 let obj = &mut guard.vec[i];
533 if predicate(&mut obj.obj, obj.metrics) {
534 i += 1;
535 } else {
536 let mut obj = guard.vec.remove(i).unwrap();
537 self.manager().detach(&mut obj.obj);
538 removed.push(obj.obj);
539 }
540 }
541 guard.size -= removed.len();
542 RetainResult {
543 retained: i,
544 removed,
545 }
546 }
547
548 pub fn timeouts(&self) -> Timeouts {
550 self.inner.config.timeouts
551 }
552
553 pub fn close(&self) {
560 self.resize(0);
561 self.inner.semaphore.close();
562 }
563
564 pub fn is_closed(&self) -> bool {
566 self.inner.semaphore.is_closed()
567 }
568
569 #[must_use]
571 pub fn status(&self) -> Status {
572 let slots = self.inner.slots.lock().unwrap();
573 let users = self.inner.users.load(Ordering::Relaxed);
574 let (available, waiting) = if users < slots.size {
575 (slots.size - users, 0)
576 } else {
577 (0, users - slots.size)
578 };
579 Status {
580 max_size: slots.max_size,
581 size: slots.size,
582 available,
583 waiting,
584 }
585 }
586
587 #[must_use]
589 pub fn manager(&self) -> &M {
590 &self.inner.manager
591 }
592}
593
594struct PoolInner<M: Manager> {
595 manager: M,
596 slots: Mutex<Slots<ObjectInner<M>>>,
597 users: AtomicUsize,
601 semaphore: Semaphore,
602 config: PoolConfig,
603 runtime: Option<Runtime>,
604 hooks: hooks::Hooks<M>,
605}
606
607#[derive(Debug)]
608struct Slots<T> {
609 vec: VecDeque<T>,
610 size: usize,
611 max_size: usize,
612}
613
614impl<M> fmt::Debug for PoolInner<M>
616where
617 M: fmt::Debug + Manager,
618 M::Type: fmt::Debug,
619{
620 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
621 f.debug_struct("PoolInner")
622 .field("manager", &self.manager)
623 .field("slots", &self.slots)
624 .field("used", &self.users)
625 .field("semaphore", &self.semaphore)
626 .field("config", &self.config)
627 .field("runtime", &self.runtime)
628 .field("hooks", &self.hooks)
629 .finish()
630 }
631}
632
633impl<M: Manager> PoolInner<M> {
634 fn return_object(&self, mut inner: ObjectInner<M>) {
635 let _ = self.users.fetch_sub(1, Ordering::Relaxed);
636 let mut slots = self.slots.lock().unwrap();
637 if slots.size <= slots.max_size {
638 slots.vec.push_back(inner);
639 drop(slots);
640 self.semaphore.add_permits(1);
641 } else {
642 slots.size -= 1;
643 drop(slots);
644 self.manager.detach(&mut inner.obj);
645 }
646 }
647 fn detach_object(&self, obj: &mut M::Type) {
648 let _ = self.users.fetch_sub(1, Ordering::Relaxed);
649 let mut slots = self.slots.lock().unwrap();
650 let add_permits = slots.size <= slots.max_size;
651 slots.size -= 1;
652 drop(slots);
653 if add_permits {
654 self.semaphore.add_permits(1);
655 }
656 self.manager.detach(obj);
657 }
658}
659
660async fn apply_timeout<O, E>(
661 runtime: Option<Runtime>,
662 timeout_type: TimeoutType,
663 duration: Option<Duration>,
664 future: impl Future<Output = Result<O, impl Into<PoolError<E>>>>,
665) -> Result<O, PoolError<E>> {
666 match (runtime, duration) {
667 (_, None) => future.await.map_err(Into::into),
668 (Some(runtime), Some(duration)) => runtime
669 .timeout(duration, future)
670 .await
671 .ok_or(PoolError::Timeout(timeout_type))?
672 .map_err(Into::into),
673 (None, Some(_)) => Err(PoolError::NoRuntimeSpecified),
674 }
675}
676
677#[derive(Debug)]
678pub struct RetainResult<T> {
680 pub retained: usize,
682 pub removed: Vec<T>,
684}
685
686impl<T> Default for RetainResult<T> {
687 fn default() -> Self {
688 Self {
689 retained: Default::default(),
690 removed: Default::default(),
691 }
692 }
693}