tokio_postgres/
client.rs

1use crate::codec::BackendMessages;
2use crate::config::SslMode;
3use crate::connection::{Request, RequestMessages};
4use crate::copy_out::CopyOutStream;
5#[cfg(feature = "runtime")]
6use crate::keepalive::KeepaliveConfig;
7use crate::query::RowStream;
8use crate::simple_query::SimpleQueryStream;
9#[cfg(feature = "runtime")]
10use crate::tls::MakeTlsConnect;
11use crate::tls::TlsConnect;
12use crate::types::{Oid, ToSql, Type};
13#[cfg(feature = "runtime")]
14use crate::Socket;
15use crate::{
16    copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, CopyInSink, Error,
17    Row, SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
18};
19use bytes::{Buf, BytesMut};
20use fallible_iterator::FallibleIterator;
21use futures_channel::mpsc;
22use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
23use parking_lot::Mutex;
24use postgres_protocol::message::backend::Message;
25use postgres_types::BorrowToSql;
26use std::collections::HashMap;
27use std::fmt;
28#[cfg(feature = "runtime")]
29use std::net::IpAddr;
30#[cfg(feature = "runtime")]
31use std::path::PathBuf;
32use std::sync::Arc;
33use std::task::{Context, Poll};
34#[cfg(feature = "runtime")]
35use std::time::Duration;
36use tokio::io::{AsyncRead, AsyncWrite};
37
38pub struct Responses {
39    receiver: mpsc::Receiver<BackendMessages>,
40    cur: BackendMessages,
41}
42
43impl Responses {
44    pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Message, Error>> {
45        loop {
46            match self.cur.next().map_err(Error::parse)? {
47                Some(Message::ErrorResponse(body)) => return Poll::Ready(Err(Error::db(body))),
48                Some(message) => return Poll::Ready(Ok(message)),
49                None => {}
50            }
51
52            match ready!(self.receiver.poll_next_unpin(cx)) {
53                Some(messages) => self.cur = messages,
54                None => return Poll::Ready(Err(Error::closed())),
55            }
56        }
57    }
58
59    pub async fn next(&mut self) -> Result<Message, Error> {
60        future::poll_fn(|cx| self.poll_next(cx)).await
61    }
62}
63
64/// A cache of type info and prepared statements for fetching type info
65/// (corresponding to the queries in the [prepare](prepare) module).
66#[derive(Default)]
67struct CachedTypeInfo {
68    /// A statement for basic information for a type from its
69    /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
70    /// fallback).
71    typeinfo: Option<Statement>,
72    /// A statement for getting information for a composite type from its OID.
73    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
74    typeinfo_composite: Option<Statement>,
75    /// A statement for getting information for a composite type from its OID.
76    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
77    /// its fallback).
78    typeinfo_enum: Option<Statement>,
79
80    /// Cache of types already looked up.
81    types: HashMap<Oid, Type>,
82}
83
84pub struct InnerClient {
85    sender: mpsc::UnboundedSender<Request>,
86    cached_typeinfo: Mutex<CachedTypeInfo>,
87
88    /// A buffer to use when writing out postgres commands.
89    buffer: Mutex<BytesMut>,
90}
91
92impl InnerClient {
93    pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
94        let (sender, receiver) = mpsc::channel(1);
95        let request = Request { messages, sender };
96        self.sender
97            .unbounded_send(request)
98            .map_err(|_| Error::closed())?;
99
100        Ok(Responses {
101            receiver,
102            cur: BackendMessages::empty(),
103        })
104    }
105
106    pub fn typeinfo(&self) -> Option<Statement> {
107        self.cached_typeinfo.lock().typeinfo.clone()
108    }
109
110    pub fn set_typeinfo(&self, statement: &Statement) {
111        self.cached_typeinfo.lock().typeinfo = Some(statement.clone());
112    }
113
114    pub fn typeinfo_composite(&self) -> Option<Statement> {
115        self.cached_typeinfo.lock().typeinfo_composite.clone()
116    }
117
118    pub fn set_typeinfo_composite(&self, statement: &Statement) {
119        self.cached_typeinfo.lock().typeinfo_composite = Some(statement.clone());
120    }
121
122    pub fn typeinfo_enum(&self) -> Option<Statement> {
123        self.cached_typeinfo.lock().typeinfo_enum.clone()
124    }
125
126    pub fn set_typeinfo_enum(&self, statement: &Statement) {
127        self.cached_typeinfo.lock().typeinfo_enum = Some(statement.clone());
128    }
129
130    pub fn type_(&self, oid: Oid) -> Option<Type> {
131        self.cached_typeinfo.lock().types.get(&oid).cloned()
132    }
133
134    pub fn set_type(&self, oid: Oid, type_: &Type) {
135        self.cached_typeinfo.lock().types.insert(oid, type_.clone());
136    }
137
138    pub fn clear_type_cache(&self) {
139        self.cached_typeinfo.lock().types.clear();
140    }
141
142    /// Call the given function with a buffer to be used when writing out
143    /// postgres commands.
144    pub fn with_buf<F, R>(&self, f: F) -> R
145    where
146        F: FnOnce(&mut BytesMut) -> R,
147    {
148        let mut buffer = self.buffer.lock();
149        let r = f(&mut buffer);
150        buffer.clear();
151        r
152    }
153}
154
155#[cfg(feature = "runtime")]
156#[derive(Clone)]
157pub(crate) struct SocketConfig {
158    pub addr: Addr,
159    pub hostname: Option<String>,
160    pub port: u16,
161    pub connect_timeout: Option<Duration>,
162    pub tcp_user_timeout: Option<Duration>,
163    pub keepalive: Option<KeepaliveConfig>,
164}
165
166#[cfg(feature = "runtime")]
167#[derive(Clone)]
168pub(crate) enum Addr {
169    Tcp(IpAddr),
170    #[cfg(unix)]
171    Unix(PathBuf),
172}
173
174/// An asynchronous PostgreSQL client.
175///
176/// The client is one half of what is returned when a connection is established. Users interact with the database
177/// through this client object.
178pub struct Client {
179    inner: Arc<InnerClient>,
180    #[cfg(feature = "runtime")]
181    socket_config: Option<SocketConfig>,
182    ssl_mode: SslMode,
183    process_id: i32,
184    secret_key: i32,
185}
186
187impl Client {
188    pub(crate) fn new(
189        sender: mpsc::UnboundedSender<Request>,
190        ssl_mode: SslMode,
191        process_id: i32,
192        secret_key: i32,
193    ) -> Client {
194        Client {
195            inner: Arc::new(InnerClient {
196                sender,
197                cached_typeinfo: Default::default(),
198                buffer: Default::default(),
199            }),
200            #[cfg(feature = "runtime")]
201            socket_config: None,
202            ssl_mode,
203            process_id,
204            secret_key,
205        }
206    }
207
208    pub(crate) fn inner(&self) -> &Arc<InnerClient> {
209        &self.inner
210    }
211
212    #[cfg(feature = "runtime")]
213    pub(crate) fn set_socket_config(&mut self, socket_config: SocketConfig) {
214        self.socket_config = Some(socket_config);
215    }
216
217    /// Creates a new prepared statement.
218    ///
219    /// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
220    /// which are set when executed. Prepared statements can only be used with the connection that created them.
221    pub async fn prepare(&self, query: &str) -> Result<Statement, Error> {
222        self.prepare_typed(query, &[]).await
223    }
224
225    /// Like `prepare`, but allows the types of query parameters to be explicitly specified.
226    ///
227    /// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
228    /// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
229    pub async fn prepare_typed(
230        &self,
231        query: &str,
232        parameter_types: &[Type],
233    ) -> Result<Statement, Error> {
234        prepare::prepare(&self.inner, query, parameter_types).await
235    }
236
237    /// Executes a statement, returning a vector of the resulting rows.
238    ///
239    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
240    /// provided, 1-indexed.
241    ///
242    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
243    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
244    /// with the `prepare` method.
245    pub async fn query<T>(
246        &self,
247        statement: &T,
248        params: &[&(dyn ToSql + Sync)],
249    ) -> Result<Vec<Row>, Error>
250    where
251        T: ?Sized + ToStatement,
252    {
253        self.query_raw(statement, slice_iter(params))
254            .await?
255            .try_collect()
256            .await
257    }
258
259    /// Executes a statement which returns a single row, returning it.
260    ///
261    /// Returns an error if the query does not return exactly one row.
262    ///
263    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
264    /// provided, 1-indexed.
265    ///
266    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
267    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
268    /// with the `prepare` method.
269    pub async fn query_one<T>(
270        &self,
271        statement: &T,
272        params: &[&(dyn ToSql + Sync)],
273    ) -> Result<Row, Error>
274    where
275        T: ?Sized + ToStatement,
276    {
277        self.query_opt(statement, params)
278            .await
279            .and_then(|res| res.ok_or_else(Error::row_count))
280    }
281
282    /// Executes a statements which returns zero or one rows, returning it.
283    ///
284    /// Returns an error if the query returns more than one row.
285    ///
286    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
287    /// provided, 1-indexed.
288    ///
289    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
290    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
291    /// with the `prepare` method.
292    pub async fn query_opt<T>(
293        &self,
294        statement: &T,
295        params: &[&(dyn ToSql + Sync)],
296    ) -> Result<Option<Row>, Error>
297    where
298        T: ?Sized + ToStatement,
299    {
300        let stream = self.query_raw(statement, slice_iter(params)).await?;
301        pin_mut!(stream);
302
303        let mut first = None;
304
305        // Originally this was two calls to `try_next().await?`,
306        // once for the first element, and second to error if more than one.
307        //
308        // However, this new form with only one .await in a loop generates
309        // slightly smaller codegen/stack usage for the resulting future.
310        while let Some(row) = stream.try_next().await? {
311            if first.is_some() {
312                return Err(Error::row_count());
313            }
314
315            first = Some(row);
316        }
317
318        Ok(first)
319    }
320
321    /// The maximally flexible version of [`query`].
322    ///
323    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
324    /// provided, 1-indexed.
325    ///
326    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
327    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
328    /// with the `prepare` method.
329    ///
330    /// [`query`]: #method.query
331    ///
332    /// # Examples
333    ///
334    /// ```no_run
335    /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
336    /// use futures_util::{pin_mut, TryStreamExt};
337    ///
338    /// let params: Vec<String> = vec![
339    ///     "first param".into(),
340    ///     "second param".into(),
341    /// ];
342    /// let mut it = client.query_raw(
343    ///     "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
344    ///     params,
345    /// ).await?;
346    ///
347    /// pin_mut!(it);
348    /// while let Some(row) = it.try_next().await? {
349    ///     let foo: i32 = row.get("foo");
350    ///     println!("foo: {}", foo);
351    /// }
352    /// # Ok(())
353    /// # }
354    /// ```
355    pub async fn query_raw<T, P, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
356    where
357        T: ?Sized + ToStatement,
358        P: BorrowToSql,
359        I: IntoIterator<Item = P>,
360        I::IntoIter: ExactSizeIterator,
361    {
362        let statement = statement.__convert().into_statement(self).await?;
363        query::query(&self.inner, statement, params).await
364    }
365
366    /// Like `query`, but requires the types of query parameters to be explicitly specified.
367    ///
368    /// Compared to `query`, this method allows performing queries without three round trips (for
369    /// prepare, execute, and close) by requiring the caller to specify parameter values along with
370    /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
371    /// supported (such as Cloudflare Workers with Hyperdrive).
372    ///
373    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
374    /// parameter of the list provided, 1-indexed.
375    pub async fn query_typed(
376        &self,
377        query: &str,
378        params: &[(&(dyn ToSql + Sync), Type)],
379    ) -> Result<Vec<Row>, Error> {
380        self.query_typed_raw(query, params.iter().map(|(v, t)| (*v, t.clone())))
381            .await?
382            .try_collect()
383            .await
384    }
385
386    /// The maximally flexible version of [`query_typed`].
387    ///
388    /// Compared to `query`, this method allows performing queries without three round trips (for
389    /// prepare, execute, and close) by requiring the caller to specify parameter values along with
390    /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
391    /// supported (such as Cloudflare Workers with Hyperdrive).
392    ///
393    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
394    /// parameter of the list provided, 1-indexed.
395    ///
396    /// [`query_typed`]: #method.query_typed
397    ///
398    /// # Examples
399    ///
400    /// ```no_run
401    /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
402    /// use futures_util::{pin_mut, TryStreamExt};
403    /// use tokio_postgres::types::Type;
404    ///
405    /// let params: Vec<(String, Type)> = vec![
406    ///     ("first param".into(), Type::TEXT),
407    ///     ("second param".into(), Type::TEXT),
408    /// ];
409    /// let mut it = client.query_typed_raw(
410    ///     "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
411    ///     params,
412    /// ).await?;
413    ///
414    /// pin_mut!(it);
415    /// while let Some(row) = it.try_next().await? {
416    ///     let foo: i32 = row.get("foo");
417    ///     println!("foo: {}", foo);
418    /// }
419    /// # Ok(())
420    /// # }
421    /// ```
422    pub async fn query_typed_raw<P, I>(&self, query: &str, params: I) -> Result<RowStream, Error>
423    where
424        P: BorrowToSql,
425        I: IntoIterator<Item = (P, Type)>,
426    {
427        query::query_typed(&self.inner, query, params).await
428    }
429
430    /// Executes a statement, returning the number of rows modified.
431    ///
432    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
433    /// provided, 1-indexed.
434    ///
435    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
436    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
437    /// with the `prepare` method.
438    ///
439    /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
440    pub async fn execute<T>(
441        &self,
442        statement: &T,
443        params: &[&(dyn ToSql + Sync)],
444    ) -> Result<u64, Error>
445    where
446        T: ?Sized + ToStatement,
447    {
448        self.execute_raw(statement, slice_iter(params)).await
449    }
450
451    /// The maximally flexible version of [`execute`].
452    ///
453    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
454    /// provided, 1-indexed.
455    ///
456    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
457    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
458    /// with the `prepare` method.
459    ///
460    /// [`execute`]: #method.execute
461    pub async fn execute_raw<T, P, I>(&self, statement: &T, params: I) -> Result<u64, Error>
462    where
463        T: ?Sized + ToStatement,
464        P: BorrowToSql,
465        I: IntoIterator<Item = P>,
466        I::IntoIter: ExactSizeIterator,
467    {
468        let statement = statement.__convert().into_statement(self).await?;
469        query::execute(self.inner(), statement, params).await
470    }
471
472    /// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
473    ///
474    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
475    /// be explicitly completed via the `Sink::close` or `finish` methods. If it is not, the copy will be aborted.
476    pub async fn copy_in<T, U>(&self, statement: &T) -> Result<CopyInSink<U>, Error>
477    where
478        T: ?Sized + ToStatement,
479        U: Buf + 'static + Send,
480    {
481        let statement = statement.__convert().into_statement(self).await?;
482        copy_in::copy_in(self.inner(), statement).await
483    }
484
485    /// Executes a `COPY TO STDOUT` statement, returning a stream of the resulting data.
486    ///
487    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
488    pub async fn copy_out<T>(&self, statement: &T) -> Result<CopyOutStream, Error>
489    where
490        T: ?Sized + ToStatement,
491    {
492        let statement = statement.__convert().into_statement(self).await?;
493        copy_out::copy_out(self.inner(), statement).await
494    }
495
496    /// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
497    ///
498    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
499    /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
500    /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning a list of the
501    /// rows, this method returns a list of an enum which indicates either the completion of one of the commands,
502    /// or a row of data. This preserves the framing between the separate statements in the request.
503    ///
504    /// # Warning
505    ///
506    /// Prepared statements should be use for any query which contains user-specified data, as they provided the
507    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
508    /// them to this method!
509    pub async fn simple_query(&self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
510        self.simple_query_raw(query).await?.try_collect().await
511    }
512
513    pub(crate) async fn simple_query_raw(&self, query: &str) -> Result<SimpleQueryStream, Error> {
514        simple_query::simple_query(self.inner(), query).await
515    }
516
517    /// Executes a sequence of SQL statements using the simple query protocol.
518    ///
519    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
520    /// point. This is intended for use when, for example, initializing a database schema.
521    ///
522    /// # Warning
523    ///
524    /// Prepared statements should be use for any query which contains user-specified data, as they provided the
525    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
526    /// them to this method!
527    pub async fn batch_execute(&self, query: &str) -> Result<(), Error> {
528        simple_query::batch_execute(self.inner(), query).await
529    }
530
531    /// Begins a new database transaction.
532    ///
533    /// The transaction will roll back by default - use the `commit` method to commit it.
534    pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
535        self.build_transaction().start().await
536    }
537
538    /// Returns a builder for a transaction with custom settings.
539    ///
540    /// Unlike the `transaction` method, the builder can be used to control the transaction's isolation level and other
541    /// attributes.
542    pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
543        TransactionBuilder::new(self)
544    }
545
546    /// Constructs a cancellation token that can later be used to request cancellation of a query running on the
547    /// connection associated with this client.
548    pub fn cancel_token(&self) -> CancelToken {
549        CancelToken {
550            #[cfg(feature = "runtime")]
551            socket_config: self.socket_config.clone(),
552            ssl_mode: self.ssl_mode,
553            process_id: self.process_id,
554            secret_key: self.secret_key,
555        }
556    }
557
558    /// Attempts to cancel an in-progress query.
559    ///
560    /// The server provides no information about whether a cancellation attempt was successful or not. An error will
561    /// only be returned if the client was unable to connect to the database.
562    ///
563    /// Requires the `runtime` Cargo feature (enabled by default).
564    #[cfg(feature = "runtime")]
565    #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
566    pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
567    where
568        T: MakeTlsConnect<Socket>,
569    {
570        self.cancel_token().cancel_query(tls).await
571    }
572
573    /// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
574    /// connection itself.
575    #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
576    pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
577    where
578        S: AsyncRead + AsyncWrite + Unpin,
579        T: TlsConnect<S>,
580    {
581        self.cancel_token().cancel_query_raw(stream, tls).await
582    }
583
584    /// Clears the client's type information cache.
585    ///
586    /// When user-defined types are used in a query, the client loads their definitions from the database and caches
587    /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
588    /// to flush the local cache and allow the new, updated definitions to be loaded.
589    pub fn clear_type_cache(&self) {
590        self.inner().clear_type_cache();
591    }
592
593    /// Determines if the connection to the server has already closed.
594    ///
595    /// In that case, all future queries will fail.
596    pub fn is_closed(&self) -> bool {
597        self.inner.sender.is_closed()
598    }
599
600    #[doc(hidden)]
601    pub fn __private_api_close(&mut self) {
602        self.inner.sender.close_channel()
603    }
604}
605
606impl fmt::Debug for Client {
607    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
608        f.debug_struct("Client").finish()
609    }
610}