tokio_postgres/transaction_builder.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
use postgres_protocol::message::frontend;
use crate::{codec::FrontendMessage, connection::RequestMessages, Client, Error, Transaction};
/// The isolation level of a database transaction.
#[derive(Debug, Copy, Clone)]
#[non_exhaustive]
pub enum IsolationLevel {
/// Equivalent to `ReadCommitted`.
ReadUncommitted,
/// An individual statement in the transaction will see rows committed before it began.
ReadCommitted,
/// All statements in the transaction will see the same view of rows committed before the first query in the
/// transaction.
RepeatableRead,
/// The reads and writes in this transaction must be able to be committed as an atomic "unit" with respect to reads
/// and writes of all other concurrent serializable transactions without interleaving.
Serializable,
}
/// A builder for database transactions.
pub struct TransactionBuilder<'a> {
client: &'a mut Client,
isolation_level: Option<IsolationLevel>,
read_only: Option<bool>,
deferrable: Option<bool>,
}
impl<'a> TransactionBuilder<'a> {
pub(crate) fn new(client: &'a mut Client) -> TransactionBuilder<'a> {
TransactionBuilder {
client,
isolation_level: None,
read_only: None,
deferrable: None,
}
}
/// Sets the isolation level of the transaction.
pub fn isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
self.isolation_level = Some(isolation_level);
self
}
/// Sets the access mode of the transaction.
pub fn read_only(mut self, read_only: bool) -> Self {
self.read_only = Some(read_only);
self
}
/// Sets the deferrability of the transaction.
///
/// If the transaction is also serializable and read only, creation of the transaction may block, but when it
/// completes the transaction is able to run with less overhead and a guarantee that it will not be aborted due to
/// serialization failure.
pub fn deferrable(mut self, deferrable: bool) -> Self {
self.deferrable = Some(deferrable);
self
}
/// Begins the transaction.
///
/// The transaction will roll back by default - use the `commit` method to commit it.
pub async fn start(self) -> Result<Transaction<'a>, Error> {
let mut query = "START TRANSACTION".to_string();
let mut first = true;
if let Some(level) = self.isolation_level {
first = false;
query.push_str(" ISOLATION LEVEL ");
let level = match level {
IsolationLevel::ReadUncommitted => "READ UNCOMMITTED",
IsolationLevel::ReadCommitted => "READ COMMITTED",
IsolationLevel::RepeatableRead => "REPEATABLE READ",
IsolationLevel::Serializable => "SERIALIZABLE",
};
query.push_str(level);
}
if let Some(read_only) = self.read_only {
if !first {
query.push(',');
}
first = false;
let s = if read_only {
" READ ONLY"
} else {
" READ WRITE"
};
query.push_str(s);
}
if let Some(deferrable) = self.deferrable {
if !first {
query.push(',');
}
let s = if deferrable {
" DEFERRABLE"
} else {
" NOT DEFERRABLE"
};
query.push_str(s);
}
struct RollbackIfNotDone<'me> {
client: &'me Client,
done: bool,
}
impl<'a> Drop for RollbackIfNotDone<'a> {
fn drop(&mut self) {
if self.done {
return;
}
let buf = self.client.inner().with_buf(|buf| {
frontend::query("ROLLBACK", buf).unwrap();
buf.split().freeze()
});
let _ = self
.client
.inner()
.send(RequestMessages::Single(FrontendMessage::Raw(buf)));
}
}
// This is done as `Future` created by this method can be dropped after
// `RequestMessages` is synchronously send to the `Connection` by
// `batch_execute()`, but before `Responses` is asynchronously polled to
// completion. In that case `Transaction` won't be created and thus
// won't be rolled back.
{
let mut cleaner = RollbackIfNotDone {
client: self.client,
done: false,
};
self.client.batch_execute(&query).await?;
cleaner.done = true;
}
Ok(Transaction::new(self.client))
}
}