tokio_postgres/
transaction_builder.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use postgres_protocol::message::frontend;

use crate::{codec::FrontendMessage, connection::RequestMessages, Client, Error, Transaction};

/// The isolation level of a database transaction.
#[derive(Debug, Copy, Clone)]
#[non_exhaustive]
pub enum IsolationLevel {
    /// Equivalent to `ReadCommitted`.
    ReadUncommitted,

    /// An individual statement in the transaction will see rows committed before it began.
    ReadCommitted,

    /// All statements in the transaction will see the same view of rows committed before the first query in the
    /// transaction.
    RepeatableRead,

    /// The reads and writes in this transaction must be able to be committed as an atomic "unit" with respect to reads
    /// and writes of all other concurrent serializable transactions without interleaving.
    Serializable,
}

/// A builder for database transactions.
pub struct TransactionBuilder<'a> {
    client: &'a mut Client,
    isolation_level: Option<IsolationLevel>,
    read_only: Option<bool>,
    deferrable: Option<bool>,
}

impl<'a> TransactionBuilder<'a> {
    pub(crate) fn new(client: &'a mut Client) -> TransactionBuilder<'a> {
        TransactionBuilder {
            client,
            isolation_level: None,
            read_only: None,
            deferrable: None,
        }
    }

    /// Sets the isolation level of the transaction.
    pub fn isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
        self.isolation_level = Some(isolation_level);
        self
    }

    /// Sets the access mode of the transaction.
    pub fn read_only(mut self, read_only: bool) -> Self {
        self.read_only = Some(read_only);
        self
    }

    /// Sets the deferrability of the transaction.
    ///
    /// If the transaction is also serializable and read only, creation of the transaction may block, but when it
    /// completes the transaction is able to run with less overhead and a guarantee that it will not be aborted due to
    /// serialization failure.
    pub fn deferrable(mut self, deferrable: bool) -> Self {
        self.deferrable = Some(deferrable);
        self
    }

    /// Begins the transaction.
    ///
    /// The transaction will roll back by default - use the `commit` method to commit it.
    pub async fn start(self) -> Result<Transaction<'a>, Error> {
        let mut query = "START TRANSACTION".to_string();
        let mut first = true;

        if let Some(level) = self.isolation_level {
            first = false;

            query.push_str(" ISOLATION LEVEL ");
            let level = match level {
                IsolationLevel::ReadUncommitted => "READ UNCOMMITTED",
                IsolationLevel::ReadCommitted => "READ COMMITTED",
                IsolationLevel::RepeatableRead => "REPEATABLE READ",
                IsolationLevel::Serializable => "SERIALIZABLE",
            };
            query.push_str(level);
        }

        if let Some(read_only) = self.read_only {
            if !first {
                query.push(',');
            }
            first = false;

            let s = if read_only {
                " READ ONLY"
            } else {
                " READ WRITE"
            };
            query.push_str(s);
        }

        if let Some(deferrable) = self.deferrable {
            if !first {
                query.push(',');
            }

            let s = if deferrable {
                " DEFERRABLE"
            } else {
                " NOT DEFERRABLE"
            };
            query.push_str(s);
        }

        struct RollbackIfNotDone<'me> {
            client: &'me Client,
            done: bool,
        }

        impl<'a> Drop for RollbackIfNotDone<'a> {
            fn drop(&mut self) {
                if self.done {
                    return;
                }

                let buf = self.client.inner().with_buf(|buf| {
                    frontend::query("ROLLBACK", buf).unwrap();
                    buf.split().freeze()
                });
                let _ = self
                    .client
                    .inner()
                    .send(RequestMessages::Single(FrontendMessage::Raw(buf)));
            }
        }

        // This is done as `Future` created by this method can be dropped after
        // `RequestMessages` is synchronously send to the `Connection` by
        // `batch_execute()`, but before `Responses` is asynchronously polled to
        // completion. In that case `Transaction` won't be created and thus
        // won't be rolled back.
        {
            let mut cleaner = RollbackIfNotDone {
                client: self.client,
                done: false,
            };
            self.client.batch_execute(&query).await?;
            cleaner.done = true;
        }

        Ok(Transaction::new(self.client))
    }
}