use bytes::{Buf, Bytes, BytesMut};
use fallible_iterator::FallibleIterator;
use postgres_protocol::message::backend;
use postgres_protocol::message::frontend::CopyData;
use std::io;
use tokio_util::codec::{Decoder, Encoder};
pub enum FrontendMessage {
Raw(Bytes),
CopyData(CopyData<Box<dyn Buf + Send>>),
}
pub enum BackendMessage {
Normal {
messages: BackendMessages,
request_complete: bool,
},
Async(backend::Message),
}
pub struct BackendMessages(BytesMut);
impl BackendMessages {
pub fn empty() -> BackendMessages {
BackendMessages(BytesMut::new())
}
}
impl FallibleIterator for BackendMessages {
type Item = backend::Message;
type Error = io::Error;
fn next(&mut self) -> io::Result<Option<backend::Message>> {
backend::Message::parse(&mut self.0)
}
}
pub struct PostgresCodec;
impl Encoder<FrontendMessage> for PostgresCodec {
type Error = io::Error;
fn encode(&mut self, item: FrontendMessage, dst: &mut BytesMut) -> io::Result<()> {
match item {
FrontendMessage::Raw(buf) => dst.extend_from_slice(&buf),
FrontendMessage::CopyData(data) => data.write(dst),
}
Ok(())
}
}
impl Decoder for PostgresCodec {
type Item = BackendMessage;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BackendMessage>, io::Error> {
let mut idx = 0;
let mut request_complete = false;
while let Some(header) = backend::Header::parse(&src[idx..])? {
let len = header.len() as usize + 1;
if src[idx..].len() < len {
break;
}
match header.tag() {
backend::NOTICE_RESPONSE_TAG
| backend::NOTIFICATION_RESPONSE_TAG
| backend::PARAMETER_STATUS_TAG => {
if idx == 0 {
let message = backend::Message::parse(src)?.unwrap();
return Ok(Some(BackendMessage::Async(message)));
} else {
break;
}
}
_ => {}
}
idx += len;
if header.tag() == backend::READY_FOR_QUERY_TAG {
request_complete = true;
break;
}
}
if idx == 0 {
Ok(None)
} else {
Ok(Some(BackendMessage::Normal {
messages: BackendMessages(src.split_to(idx)),
request_complete,
}))
}
}
}