1#![allow(clippy::too_many_arguments)]
2#![allow(unsafe_code)] extern crate pq_sys;
5
6use self::pq_sys::*;
7use std::ffi::{CStr, CString};
8use std::os::raw as libc;
9use std::ptr::NonNull;
10use std::{ptr, str};
11
12use crate::result::*;
13
14use super::result::PgResult;
15
16#[allow(missing_debug_implementations, missing_copy_implementations)]
17pub(super) struct RawConnection {
18 pub(super) internal_connection: NonNull<PGconn>,
19}
20
21impl RawConnection {
22 pub(super) fn establish(database_url: &str) -> ConnectionResult<Self> {
23 let connection_string = CString::new(database_url)?;
24 let connection_ptr = unsafe { PQconnectdb(connection_string.as_ptr()) };
25 let connection_status = unsafe { PQstatus(connection_ptr) };
26
27 match connection_status {
28 ConnStatusType::CONNECTION_OK => {
29 let connection_ptr = unsafe { NonNull::new_unchecked(connection_ptr) };
30 Ok(RawConnection {
31 internal_connection: connection_ptr,
32 })
33 }
34 _ => {
35 let message = last_error_message(connection_ptr);
36
37 if !connection_ptr.is_null() {
38 unsafe { PQfinish(connection_ptr) }
42 }
43
44 Err(ConnectionError::BadConnection(message))
45 }
46 }
47 }
48
49 pub(super) fn last_error_message(&self) -> String {
50 last_error_message(self.internal_connection.as_ptr())
51 }
52
53 pub(super) fn set_notice_processor(&self, notice_processor: NoticeProcessor) {
54 unsafe {
55 PQsetNoticeProcessor(
56 self.internal_connection.as_ptr(),
57 Some(notice_processor),
58 ptr::null_mut(),
59 );
60 }
61 }
62
63 pub(super) unsafe fn exec(&self, query: *const libc::c_char) -> QueryResult<RawResult> {
64 RawResult::new(PQexec(self.internal_connection.as_ptr(), query), self)
65 }
66
67 pub(super) unsafe fn send_query_params(
72 &self,
73 query: *const libc::c_char,
74 param_count: libc::c_int,
75 param_types: *const Oid,
76 param_values: *const *const libc::c_char,
77 param_lengths: *const libc::c_int,
78 param_formats: *const libc::c_int,
79 result_format: libc::c_int,
80 ) -> QueryResult<()> {
81 let res = PQsendQueryParams(
82 self.internal_connection.as_ptr(),
83 query,
84 param_count,
85 param_types,
86 param_values,
87 param_lengths,
88 param_formats,
89 result_format,
90 );
91 if res == 1 {
92 Ok(())
93 } else {
94 Err(Error::DatabaseError(
95 DatabaseErrorKind::UnableToSendCommand,
96 Box::new(self.last_error_message()),
97 ))
98 }
99 }
100
101 pub(super) unsafe fn send_query_prepared(
102 &self,
103 stmt_name: *const libc::c_char,
104 param_count: libc::c_int,
105 param_values: *const *const libc::c_char,
106 param_lengths: *const libc::c_int,
107 param_formats: *const libc::c_int,
108 result_format: libc::c_int,
109 ) -> QueryResult<()> {
110 let res = PQsendQueryPrepared(
111 self.internal_connection.as_ptr(),
112 stmt_name,
113 param_count,
114 param_values,
115 param_lengths,
116 param_formats,
117 result_format,
118 );
119 if res == 1 {
120 Ok(())
121 } else {
122 Err(Error::DatabaseError(
123 DatabaseErrorKind::UnableToSendCommand,
124 Box::new(self.last_error_message()),
125 ))
126 }
127 }
128
129 pub(super) unsafe fn prepare(
130 &self,
131 stmt_name: *const libc::c_char,
132 query: *const libc::c_char,
133 param_count: libc::c_int,
134 param_types: *const Oid,
135 ) -> QueryResult<RawResult> {
136 let ptr = PQprepare(
137 self.internal_connection.as_ptr(),
138 stmt_name,
139 query,
140 param_count,
141 param_types,
142 );
143 RawResult::new(ptr, self)
144 }
145
146 pub(super) fn transaction_status(&self) -> PgTransactionStatus {
149 unsafe { PQtransactionStatus(self.internal_connection.as_ptr()) }.into()
150 }
151
152 pub(super) fn get_status(&self) -> ConnStatusType {
153 unsafe { PQstatus(self.internal_connection.as_ptr()) }
154 }
155
156 pub(crate) fn get_next_result(&self) -> Result<Option<PgResult>, Error> {
157 let res = unsafe { PQgetResult(self.internal_connection.as_ptr()) };
158 if res.is_null() {
159 Ok(None)
160 } else {
161 let raw = RawResult::new(res, self)?;
162 Ok(Some(PgResult::new(raw, self)?))
163 }
164 }
165
166 pub(crate) fn enable_row_by_row_mode(&self) -> QueryResult<()> {
167 let res = unsafe { PQsetSingleRowMode(self.internal_connection.as_ptr()) };
168 if res == 1 {
169 Ok(())
170 } else {
171 Err(Error::DatabaseError(
172 DatabaseErrorKind::Unknown,
173 Box::new(self.last_error_message()),
174 ))
175 }
176 }
177
178 pub(super) fn put_copy_data(&mut self, buf: &[u8]) -> QueryResult<()> {
179 for c in buf.chunks(i32::MAX as usize) {
180 let res = unsafe {
181 pq_sys::PQputCopyData(
182 self.internal_connection.as_ptr(),
183 c.as_ptr() as *const libc::c_char,
184 c.len()
185 .try_into()
186 .map_err(|e| Error::SerializationError(Box::new(e)))?,
187 )
188 };
189 if res != 1 {
190 return Err(Error::DatabaseError(
191 DatabaseErrorKind::Unknown,
192 Box::new(self.last_error_message()),
193 ));
194 }
195 }
196 Ok(())
197 }
198
199 pub(crate) fn finish_copy_from(&self, err: Option<String>) -> QueryResult<()> {
200 let error = err.map(CString::new).map(|r| {
201 r.unwrap_or_else(|_| {
202 CString::new("Error message contains a \\0 byte")
203 .expect("Does not contain a null byte")
204 })
205 });
206 let error = error
207 .as_ref()
208 .map(|l| l.as_ptr())
209 .unwrap_or(std::ptr::null());
210 let ret = unsafe { pq_sys::PQputCopyEnd(self.internal_connection.as_ptr(), error) };
211 if ret == 1 {
212 Ok(())
213 } else {
214 Err(Error::DatabaseError(
215 DatabaseErrorKind::Unknown,
216 Box::new(self.last_error_message()),
217 ))
218 }
219 }
220}
221
222#[derive(Debug, PartialEq, Eq, Clone, Copy)]
224pub(super) enum PgTransactionStatus {
225 Idle,
227 Active,
229 InTransaction,
231 InError,
233 Unknown,
235}
236
237impl From<PGTransactionStatusType> for PgTransactionStatus {
238 fn from(trans_status_type: PGTransactionStatusType) -> Self {
239 match trans_status_type {
240 PGTransactionStatusType::PQTRANS_IDLE => PgTransactionStatus::Idle,
241 PGTransactionStatusType::PQTRANS_ACTIVE => PgTransactionStatus::Active,
242 PGTransactionStatusType::PQTRANS_INTRANS => PgTransactionStatus::InTransaction,
243 PGTransactionStatusType::PQTRANS_INERROR => PgTransactionStatus::InError,
244 PGTransactionStatusType::PQTRANS_UNKNOWN => PgTransactionStatus::Unknown,
245 }
246 }
247}
248
249pub(super) type NoticeProcessor =
250 extern "C" fn(arg: *mut libc::c_void, message: *const libc::c_char);
251
252impl Drop for RawConnection {
253 fn drop(&mut self) {
254 unsafe { PQfinish(self.internal_connection.as_ptr()) };
255 }
256}
257
258fn last_error_message(conn: *const PGconn) -> String {
259 unsafe {
260 let error_ptr = PQerrorMessage(conn);
261 let bytes = CStr::from_ptr(error_ptr).to_bytes();
262 String::from_utf8_lossy(bytes).to_string()
263 }
264}
265
266#[allow(missing_debug_implementations)]
272pub(super) struct RawResult(NonNull<PGresult>);
273
274unsafe impl Send for RawResult {}
275unsafe impl Sync for RawResult {}
276
277impl RawResult {
278 #[allow(clippy::new_ret_no_self)]
279 fn new(ptr: *mut PGresult, conn: &RawConnection) -> QueryResult<Self> {
280 NonNull::new(ptr).map(RawResult).ok_or_else(|| {
281 Error::DatabaseError(
282 DatabaseErrorKind::UnableToSendCommand,
283 Box::new(conn.last_error_message()),
284 )
285 })
286 }
287
288 pub(super) fn as_ptr(&self) -> *mut PGresult {
289 self.0.as_ptr()
290 }
291
292 pub(super) fn error_message(&self) -> &str {
293 let ptr = unsafe { PQresultErrorMessage(self.0.as_ptr()) };
294 let cstr = unsafe { CStr::from_ptr(ptr) };
295 cstr.to_str().unwrap_or_default()
296 }
297}
298
299impl Drop for RawResult {
300 fn drop(&mut self) {
301 unsafe { PQclear(self.0.as_ptr()) }
302 }
303}