diesel/pg/connection/
raw.rs

1#![allow(clippy::too_many_arguments)]
2#![allow(unsafe_code)] // ffi code
3
4extern crate pq_sys;
5
6use self::pq_sys::*;
7use std::ffi::{CStr, CString};
8use std::os::raw as libc;
9use std::ptr::NonNull;
10use std::{ptr, str};
11
12use crate::result::*;
13
14use super::result::PgResult;
15
16#[allow(missing_debug_implementations, missing_copy_implementations)]
17pub(super) struct RawConnection {
18    pub(super) internal_connection: NonNull<PGconn>,
19}
20
21impl RawConnection {
22    pub(super) fn establish(database_url: &str) -> ConnectionResult<Self> {
23        let connection_string = CString::new(database_url)?;
24        let connection_ptr = unsafe { PQconnectdb(connection_string.as_ptr()) };
25        let connection_status = unsafe { PQstatus(connection_ptr) };
26
27        match connection_status {
28            ConnStatusType::CONNECTION_OK => {
29                let connection_ptr = unsafe { NonNull::new_unchecked(connection_ptr) };
30                Ok(RawConnection {
31                    internal_connection: connection_ptr,
32                })
33            }
34            _ => {
35                let message = last_error_message(connection_ptr);
36
37                if !connection_ptr.is_null() {
38                    // Note that even if the server connection attempt fails (as indicated by PQstatus),
39                    // the application should call PQfinish to free the memory used by the PGconn object.
40                    // https://www.postgresql.org/docs/current/libpq-connect.html
41                    unsafe { PQfinish(connection_ptr) }
42                }
43
44                Err(ConnectionError::BadConnection(message))
45            }
46        }
47    }
48
49    pub(super) fn last_error_message(&self) -> String {
50        last_error_message(self.internal_connection.as_ptr())
51    }
52
53    pub(super) fn set_notice_processor(&self, notice_processor: NoticeProcessor) {
54        unsafe {
55            PQsetNoticeProcessor(
56                self.internal_connection.as_ptr(),
57                Some(notice_processor),
58                ptr::null_mut(),
59            );
60        }
61    }
62
63    pub(super) unsafe fn exec(&self, query: *const libc::c_char) -> QueryResult<RawResult> {
64        RawResult::new(PQexec(self.internal_connection.as_ptr(), query), self)
65    }
66
67    /// Sends a query and parameters to the server without using the prepare/bind cycle.
68    ///
69    /// This method uses PQsendQueryParams which combines the prepare and bind steps
70    /// and is more compatible with connection poolers like PgBouncer.
71    pub(super) unsafe fn send_query_params(
72        &self,
73        query: *const libc::c_char,
74        param_count: libc::c_int,
75        param_types: *const Oid,
76        param_values: *const *const libc::c_char,
77        param_lengths: *const libc::c_int,
78        param_formats: *const libc::c_int,
79        result_format: libc::c_int,
80    ) -> QueryResult<()> {
81        let res = PQsendQueryParams(
82            self.internal_connection.as_ptr(),
83            query,
84            param_count,
85            param_types,
86            param_values,
87            param_lengths,
88            param_formats,
89            result_format,
90        );
91        if res == 1 {
92            Ok(())
93        } else {
94            Err(Error::DatabaseError(
95                DatabaseErrorKind::UnableToSendCommand,
96                Box::new(self.last_error_message()),
97            ))
98        }
99    }
100
101    pub(super) unsafe fn send_query_prepared(
102        &self,
103        stmt_name: *const libc::c_char,
104        param_count: libc::c_int,
105        param_values: *const *const libc::c_char,
106        param_lengths: *const libc::c_int,
107        param_formats: *const libc::c_int,
108        result_format: libc::c_int,
109    ) -> QueryResult<()> {
110        let res = PQsendQueryPrepared(
111            self.internal_connection.as_ptr(),
112            stmt_name,
113            param_count,
114            param_values,
115            param_lengths,
116            param_formats,
117            result_format,
118        );
119        if res == 1 {
120            Ok(())
121        } else {
122            Err(Error::DatabaseError(
123                DatabaseErrorKind::UnableToSendCommand,
124                Box::new(self.last_error_message()),
125            ))
126        }
127    }
128
129    pub(super) unsafe fn prepare(
130        &self,
131        stmt_name: *const libc::c_char,
132        query: *const libc::c_char,
133        param_count: libc::c_int,
134        param_types: *const Oid,
135    ) -> QueryResult<RawResult> {
136        let ptr = PQprepare(
137            self.internal_connection.as_ptr(),
138            stmt_name,
139            query,
140            param_count,
141            param_types,
142        );
143        RawResult::new(ptr, self)
144    }
145
146    /// This is reasonably inexpensive as it just accesses variables internal to the connection
147    /// that are kept up to date by the `ReadyForQuery` messages from the PG server
148    pub(super) fn transaction_status(&self) -> PgTransactionStatus {
149        unsafe { PQtransactionStatus(self.internal_connection.as_ptr()) }.into()
150    }
151
152    pub(super) fn get_status(&self) -> ConnStatusType {
153        unsafe { PQstatus(self.internal_connection.as_ptr()) }
154    }
155
156    pub(crate) fn get_next_result(&self) -> Result<Option<PgResult>, Error> {
157        let res = unsafe { PQgetResult(self.internal_connection.as_ptr()) };
158        if res.is_null() {
159            Ok(None)
160        } else {
161            let raw = RawResult::new(res, self)?;
162            Ok(Some(PgResult::new(raw, self)?))
163        }
164    }
165
166    pub(crate) fn enable_row_by_row_mode(&self) -> QueryResult<()> {
167        let res = unsafe { PQsetSingleRowMode(self.internal_connection.as_ptr()) };
168        if res == 1 {
169            Ok(())
170        } else {
171            Err(Error::DatabaseError(
172                DatabaseErrorKind::Unknown,
173                Box::new(self.last_error_message()),
174            ))
175        }
176    }
177
178    pub(super) fn put_copy_data(&mut self, buf: &[u8]) -> QueryResult<()> {
179        for c in buf.chunks(i32::MAX as usize) {
180            let res = unsafe {
181                pq_sys::PQputCopyData(
182                    self.internal_connection.as_ptr(),
183                    c.as_ptr() as *const libc::c_char,
184                    c.len()
185                        .try_into()
186                        .map_err(|e| Error::SerializationError(Box::new(e)))?,
187                )
188            };
189            if res != 1 {
190                return Err(Error::DatabaseError(
191                    DatabaseErrorKind::Unknown,
192                    Box::new(self.last_error_message()),
193                ));
194            }
195        }
196        Ok(())
197    }
198
199    pub(crate) fn finish_copy_from(&self, err: Option<String>) -> QueryResult<()> {
200        let error = err.map(CString::new).map(|r| {
201            r.unwrap_or_else(|_| {
202                CString::new("Error message contains a \\0 byte")
203                    .expect("Does not contain a null byte")
204            })
205        });
206        let error = error
207            .as_ref()
208            .map(|l| l.as_ptr())
209            .unwrap_or(std::ptr::null());
210        let ret = unsafe { pq_sys::PQputCopyEnd(self.internal_connection.as_ptr(), error) };
211        if ret == 1 {
212            Ok(())
213        } else {
214            Err(Error::DatabaseError(
215                DatabaseErrorKind::Unknown,
216                Box::new(self.last_error_message()),
217            ))
218        }
219    }
220}
221
222/// Represents the current in-transaction status of the connection
223#[derive(Debug, PartialEq, Eq, Clone, Copy)]
224pub(super) enum PgTransactionStatus {
225    /// Currently idle
226    Idle,
227    /// A command is in progress (sent to the server but not yet completed)
228    Active,
229    /// Idle, in a valid transaction block
230    InTransaction,
231    /// Idle, in a failed transaction block
232    InError,
233    /// Bad connection
234    Unknown,
235}
236
237impl From<PGTransactionStatusType> for PgTransactionStatus {
238    fn from(trans_status_type: PGTransactionStatusType) -> Self {
239        match trans_status_type {
240            PGTransactionStatusType::PQTRANS_IDLE => PgTransactionStatus::Idle,
241            PGTransactionStatusType::PQTRANS_ACTIVE => PgTransactionStatus::Active,
242            PGTransactionStatusType::PQTRANS_INTRANS => PgTransactionStatus::InTransaction,
243            PGTransactionStatusType::PQTRANS_INERROR => PgTransactionStatus::InError,
244            PGTransactionStatusType::PQTRANS_UNKNOWN => PgTransactionStatus::Unknown,
245        }
246    }
247}
248
249pub(super) type NoticeProcessor =
250    extern "C" fn(arg: *mut libc::c_void, message: *const libc::c_char);
251
252impl Drop for RawConnection {
253    fn drop(&mut self) {
254        unsafe { PQfinish(self.internal_connection.as_ptr()) };
255    }
256}
257
258fn last_error_message(conn: *const PGconn) -> String {
259    unsafe {
260        let error_ptr = PQerrorMessage(conn);
261        let bytes = CStr::from_ptr(error_ptr).to_bytes();
262        String::from_utf8_lossy(bytes).to_string()
263    }
264}
265
266/// Internal wrapper around a `*mut PGresult` which is known to be not-null, and
267/// have no aliases.  This wrapper is to ensure that it's always properly
268/// dropped.
269///
270/// If `Unique` is ever stabilized, we should use it here.
271#[allow(missing_debug_implementations)]
272pub(super) struct RawResult(NonNull<PGresult>);
273
274unsafe impl Send for RawResult {}
275unsafe impl Sync for RawResult {}
276
277impl RawResult {
278    #[allow(clippy::new_ret_no_self)]
279    fn new(ptr: *mut PGresult, conn: &RawConnection) -> QueryResult<Self> {
280        NonNull::new(ptr).map(RawResult).ok_or_else(|| {
281            Error::DatabaseError(
282                DatabaseErrorKind::UnableToSendCommand,
283                Box::new(conn.last_error_message()),
284            )
285        })
286    }
287
288    pub(super) fn as_ptr(&self) -> *mut PGresult {
289        self.0.as_ptr()
290    }
291
292    pub(super) fn error_message(&self) -> &str {
293        let ptr = unsafe { PQresultErrorMessage(self.0.as_ptr()) };
294        let cstr = unsafe { CStr::from_ptr(ptr) };
295        cstr.to_str().unwrap_or_default()
296    }
297}
298
299impl Drop for RawResult {
300    fn drop(&mut self) {
301        unsafe { PQclear(self.0.as_ptr()) }
302    }
303}