mailspy/src/smtp.rs

354 lines
12 KiB
Rust

use bytes::{Buf, BytesMut};
use mailparse::MailHeaderMap;
use std::fmt::{Display, Write as _};
use std::io::Cursor;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufWriter},
net::{TcpListener, TcpStream},
};
use crate::mail::{Mail, MailPart, Mailbox};
#[derive(PartialEq, Eq)]
pub enum ConnectionState {
Commands,
Data,
}
#[derive(Debug, Clone)]
pub enum Frame {
Header,
Raw(String),
Ehlo(String),
From(String),
To(String),
Ok(String),
DataStart,
DataEnd,
Quit,
StartMailInput,
Close,
}
impl Display for Frame {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Frame::Ok(v) => write!(f, "250 {}\r\n", v),
Frame::Raw(v) => write!(f, "{}\r\n", v),
Frame::Close => write!(f, "221 Closing connection\r\n"),
Frame::Header => write!(f, "220 Mailspy Test Server\r\n"),
Frame::StartMailInput => write!(f, "354 Start mail input\r\n"),
_ => Ok(()),
}
}
}
impl Frame {
pub fn check(buf: &mut Cursor<&[u8]>) -> anyhow::Result<()> {
get_line(buf)?;
Ok(())
}
pub fn parse(buf: &mut Cursor<&[u8]>) -> anyhow::Result<Frame> {
let line = get_line(buf)?.to_vec();
let string = String::from_utf8(line)?;
if string.to_lowercase().starts_with("ehlo") {
return Ok(Frame::Ehlo(string[3..].to_string()));
}
if string.to_lowercase().starts_with("mail from:") {
return Ok(Frame::From(string[9..].to_string()));
}
if string.to_lowercase().starts_with("rcpt to:") {
return Ok(Frame::To(string[8..].to_string()));
}
if string.to_lowercase().starts_with("data") {
return Ok(Frame::DataStart);
}
if string.to_lowercase().starts_with("quit") {
return Ok(Frame::Quit);
}
if string.to_lowercase().trim().starts_with('.') {
return Ok(Frame::DataEnd);
}
Ok(Frame::Raw(string))
}
}
#[async_trait::async_trait]
pub trait Transmitter {
async fn read_frame(&mut self) -> anyhow::Result<Option<Frame>>;
async fn write_frame(&mut self, frame: &Frame) -> anyhow::Result<()>;
}
pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}
#[async_trait::async_trait]
impl Transmitter for Connection {
async fn read_frame(&mut self) -> anyhow::Result<Option<Frame>> {
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
if 0 == self.stream.read_buf(&mut self.buffer).await? {
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err(anyhow::anyhow!("Connection reset by peer"));
}
}
}
}
async fn write_frame(&mut self, frame: &Frame) -> anyhow::Result<()> {
self.stream.write_all(frame.to_string().as_bytes()).await?;
self.stream.flush().await?;
Ok(())
}
}
impl Connection {
pub fn new(socket: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(socket),
buffer: BytesMut::with_capacity(4 * 1024),
}
}
fn parse_frame(&mut self) -> anyhow::Result<Option<Frame>> {
let mut buf = Cursor::new(&self.buffer[..]);
match Frame::check(&mut buf) {
Ok(_) => {
let len = buf.position() as usize;
buf.set_position(0);
let frame = Frame::parse(&mut buf)?;
self.buffer.advance(len);
Ok(Some(frame))
}
Err(_) => Ok(None),
}
}
}
pub async fn server(mailbox: Mailbox, port: u16) -> anyhow::Result<()> {
let addr = format!("0.0.0.0:{}", port);
let listener = TcpListener::bind(&addr).await?;
tracing::info!(port =? port, "SMTP Server running");
tokio::spawn(async move {
loop {
if let Ok((socket, _)) = listener.accept().await {
let mb = mailbox.clone();
tokio::spawn(async move {
if let Err(e) = process(Connection::new(socket), mb).await {
tracing::error!("Mail processing error: {}", e);
}
});
}
}
});
Ok(())
}
async fn process(mut connection: impl Transmitter, mailbox: Mailbox) -> anyhow::Result<()> {
connection.write_frame(&Frame::Header).await.unwrap();
let mut mail_from = String::new();
let mut rcpt_to = String::new();
let mut state = ConnectionState::Commands;
let mut data = String::new();
loop {
if let Some(frame) = connection.read_frame().await.unwrap() {
match frame {
Frame::Ehlo(_) => {
connection
.write_frame(&Frame::Ok("EHLO".to_string()))
.await?;
}
Frame::From(val) => {
mail_from = val;
connection
.write_frame(&Frame::Ok("2.1.0 Sender OK".to_string()))
.await?;
}
Frame::To(val) => {
rcpt_to = val;
connection
.write_frame(&Frame::Ok("2.1.5 Recipient OK".to_string()))
.await?;
}
Frame::DataStart => {
connection.write_frame(&Frame::StartMailInput).await?;
state = ConnectionState::Data;
}
Frame::DataEnd => {
connection
.write_frame(&Frame::Ok("Mail sent".to_string()))
.await?;
let mut mail = Mail::default();
let data = data.clone();
let msg = mailparse::parse_mail(data.as_bytes())?;
if msg.subparts.is_empty() {
mail.body = vec![MailPart {
content_type: msg
.headers
.get_first_value("Content-Type")
.unwrap_or_else(|| "text/plain".to_string()),
data: msg.get_body()?,
}];
} else {
mail.body = msg
.subparts
.iter()
.map(|part| {
part.get_body().map(|body| MailPart {
content_type: part
.headers
.get_first_value("Content-Type")
.unwrap_or_else(|| "text/plain".to_string()),
data: body,
})
})
.collect::<Result<Vec<MailPart>, _>>()?;
}
mail.subject = msg.headers.get_first_value("Subject").unwrap_or_default();
mail.from = msg
.headers
.get_first_value("From")
.unwrap_or_else(|| mail_from.clone());
mail.to = msg
.headers
.get_first_value("To")
.unwrap_or_else(|| rcpt_to.clone());
mail.date = msg.headers.get_first_value("Date").unwrap_or_default();
mailbox.store(mail).await;
connection.write_frame(&Frame::Close).await?;
break Ok(());
}
Frame::Raw(s) if state == ConnectionState::Data => {
writeln!(data, "{}", s)?;
}
_ => {
connection.write_frame(&Frame::Ok("OK".to_string())).await?;
}
}
}
}
}
/// Find a line
fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> anyhow::Result<&'a [u8]> {
if !src.has_remaining() {
return Err(anyhow::anyhow!("Incomplete"));
}
// Scan the bytes directly
let start = src.position() as usize;
// Scan to the second to last byte
let end = src.get_ref().len() - 1;
for i in start..end {
if src.get_ref()[i + 1] == b'\n' {
// We found a line, update the position to be *after* the \n
src.set_position((i + 2) as u64);
// Return the line
return Ok(&src.get_ref()[start..i + 1]);
}
}
Err(anyhow::anyhow!("Incomplete"))
}
#[cfg(test)]
mod tests {
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use super::*;
struct TestConnection {
pub in_rx: UnboundedReceiver<Frame>,
pub out_tx: UnboundedSender<Frame>,
}
#[async_trait::async_trait]
impl Transmitter for TestConnection {
async fn read_frame(&mut self) -> anyhow::Result<Option<Frame>> {
Ok(self.in_rx.recv().await)
}
async fn write_frame(&mut self, frame: &Frame) -> anyhow::Result<()> {
self.out_tx.send(frame.clone())?;
Ok(())
}
}
impl TestConnection {
pub fn setup() -> (UnboundedSender<Frame>, UnboundedReceiver<Frame>, Self) {
let (in_tx, in_rx) = tokio::sync::mpsc::unbounded_channel();
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
let s = Self { in_rx, out_tx };
(in_tx, out_rx, s)
}
}
#[tokio::test]
async fn test_process() {
let mb = Mailbox::new();
let (tx, mut rx, conn) = TestConnection::setup();
tokio::spawn(process(conn, mb.clone()));
assert!(matches!(rx.recv().await, Some(Frame::Header)));
tx.send(Frame::Ehlo("Test mail client".to_string()))
.unwrap();
assert!(matches!(rx.recv().await, Some(Frame::Ok(_))));
tx.send(Frame::From("alice@alison.com".to_string()))
.unwrap();
assert!(matches!(rx.recv().await, Some(Frame::Ok(_))));
tx.send(Frame::To("alice@alison.com".to_string())).unwrap();
assert!(matches!(rx.recv().await, Some(Frame::Ok(_))));
tx.send(Frame::To("alice@alison.com".to_string())).unwrap();
tx.send(Frame::DataStart).unwrap();
tx.send(Frame::Raw("body".to_string())).unwrap();
tx.send(Frame::DataEnd).unwrap();
assert!(matches!(rx.recv().await, Some(Frame::Ok(_))));
}
#[test]
fn test_get_line() {
let mut c = Cursor::new("First line\nSecond line\nThird line\n".as_bytes());
assert_eq!(get_line(&mut c).unwrap(), "First line".as_bytes());
assert_eq!(get_line(&mut c).unwrap(), "Second line".as_bytes());
assert_eq!(get_line(&mut c).unwrap(), "Third line".as_bytes());
}
#[test]
fn test_frame_parse() {
assert!(matches!(
Frame::parse(&mut Cursor::new("EHLO example.com\n".as_bytes())).unwrap(),
Frame::Ehlo(_),
));
assert!(matches!(
Frame::parse(&mut Cursor::new("RCPT TO: alice@example.com\n".as_bytes())).unwrap(),
Frame::To(_),
));
assert!(matches!(
Frame::parse(&mut Cursor::new(
"MAIL FROM: alice@example.com\n".as_bytes()
))
.unwrap(),
Frame::From(_),
));
assert!(matches!(
Frame::parse(&mut Cursor::new("DATA\n".as_bytes())).unwrap(),
Frame::DataStart,
));
assert!(matches!(
Frame::parse(&mut Cursor::new(".\n".as_bytes())).unwrap(),
Frame::DataEnd,
));
assert!(matches!(
Frame::parse(&mut Cursor::new("QUIT\n".as_bytes())).unwrap(),
Frame::Quit,
));
}
}