diesel/pg/connection/stmt/
mod.rs

1#![allow(unsafe_code)] // ffi code
2extern crate pq_sys;
3
4use std::ffi::CString;
5use std::os::raw as libc;
6use std::ptr;
7
8use super::result::PgResult;
9use crate::pg::PgTypeMetadata;
10use crate::result::QueryResult;
11
12use super::raw::RawConnection;
13
14enum StatementKind {
15    Unnamed { sql: CString, param_types: Vec<u32> },
16    Named { name: CString },
17}
18
19pub(crate) struct Statement {
20    kind: StatementKind,
21    param_formats: Vec<libc::c_int>,
22}
23
24impl Statement {
25    pub(super) fn execute(
26        &self,
27        raw_connection: &mut RawConnection,
28        param_data: &[Option<Vec<u8>>],
29        row_by_row: bool,
30    ) -> QueryResult<PgResult> {
31        let params_pointer = param_data
32            .iter()
33            .map(|data| {
34                data.as_ref()
35                    .map(|d| d.as_ptr() as *const libc::c_char)
36                    .unwrap_or(ptr::null())
37            })
38            .collect::<Vec<_>>();
39        let param_lengths = param_data
40            .iter()
41            .map(|data| data.as_ref().map(|d| d.len().try_into()).unwrap_or(Ok(0)))
42            .collect::<Result<Vec<_>, _>>()
43            .map_err(|e| crate::result::Error::SerializationError(Box::new(e)))?;
44        let param_count: libc::c_int = params_pointer
45            .len()
46            .try_into()
47            .map_err(|e| crate::result::Error::SerializationError(Box::new(e)))?;
48
49        match &self.kind {
50            StatementKind::Named { name } => {
51                unsafe {
52                    // execute the previously prepared statement
53                    // in autocommit mode, this will be a new transaction
54                    raw_connection.send_query_prepared(
55                        name.as_ptr(),
56                        param_count,
57                        params_pointer.as_ptr(),
58                        param_lengths.as_ptr(),
59                        self.param_formats.as_ptr(),
60                        1,
61                    )
62                }?
63            }
64            StatementKind::Unnamed { sql, param_types } => unsafe {
65                // execute the unnamed prepared statement using send_query_params
66                // which internally calls PQsendQueryParams, making sure the
67                // prepare and execute happens in a single transaction. This
68                // makes sure these are handled by PgBouncer.
69                // See https://github.com/diesel-rs/diesel/pull/4539
70                raw_connection.send_query_params(
71                    sql.as_ptr(),
72                    param_count,
73                    param_types.as_ptr(),
74                    params_pointer.as_ptr(),
75                    param_lengths.as_ptr(),
76                    self.param_formats.as_ptr(),
77                    1,
78                )
79            }?,
80        };
81        if row_by_row {
82            raw_connection.enable_row_by_row_mode()?;
83        }
84        Ok(raw_connection.get_next_result()?.expect("Is never none"))
85    }
86
87    pub(super) fn prepare(
88        raw_connection: &mut RawConnection,
89        sql: &str,
90        name: Option<&str>,
91        param_types: &[PgTypeMetadata],
92    ) -> QueryResult<Self> {
93        let sql = CString::new(sql)?;
94        let param_types_vec = param_types
95            .iter()
96            .map(|x| x.oid())
97            .collect::<Result<Vec<_>, _>>()
98            .map_err(|e| crate::result::Error::SerializationError(Box::new(e)))?;
99
100        if let Some(name) = name {
101            let name = CString::new(name)?;
102            let param_count: libc::c_int = param_types
103                .len()
104                .try_into()
105                .map_err(|e| crate::result::Error::SerializationError(Box::new(e)))?;
106            let internal_result = unsafe {
107                raw_connection.prepare(
108                    name.as_ptr(),
109                    sql.as_ptr(),
110                    param_count,
111                    param_types_to_ptr(Some(&param_types_vec)),
112                )
113            };
114            PgResult::new(internal_result?, raw_connection)?;
115
116            Ok(Statement {
117                kind: StatementKind::Named { name },
118                param_formats: vec![1; param_types.len()],
119            })
120        } else {
121            // For unnamed statements, we'll return a Statement object without
122            // actually preparing it. This allows us to use send_query_params
123            // later in the execute call. This is needed to better interface
124            // with PgBouncer which cannot handle unnamed prepared statements
125            // when those are prepared and executed in separate transactions.
126            // See https://github.com/diesel-rs/diesel/pull/4539
127            Ok(Statement {
128                kind: StatementKind::Unnamed {
129                    sql,
130                    param_types: param_types_vec,
131                },
132                param_formats: vec![1; param_types.len()],
133            })
134        }
135    }
136}
137
138fn param_types_to_ptr(param_types: Option<&Vec<u32>>) -> *const pq_sys::Oid {
139    param_types
140        .map(|types| types.as_ptr())
141        .unwrap_or(ptr::null())
142}