use bytes::{Buf, BytesMut}; use mailparse::MailHeaderMap; use std::fmt::Write as _; use std::io::Cursor; use tokio::{ io::{AsyncReadExt, AsyncWriteExt, BufWriter}, net::{TcpListener, TcpStream}, }; use crate::mail::{Mail, MailPart, Mailbox}; #[derive(PartialEq, Eq)] pub enum ConnectionState { Commands, Data, } #[derive(Debug)] pub enum Frame { Header, Raw(String), Ehlo(String), From(String), To(String), Ok(String), DataStart, DataEnd, Quit, StartMailInput, Close, } impl Frame { pub fn check(buf: &mut Cursor<&[u8]>) -> anyhow::Result<()> { get_line(buf)?; Ok(()) } pub fn parse(buf: &mut Cursor<&[u8]>) -> anyhow::Result { let line = get_line(buf)?.to_vec(); let string = String::from_utf8(line)?; if string.to_lowercase().starts_with("ehlo") { return Ok(Frame::Ehlo(string[3..].to_string())); } if string.to_lowercase().starts_with("mail from:") { return Ok(Frame::From(string[9..].to_string())); } if string.to_lowercase().starts_with("rcpt to:") { return Ok(Frame::To(string[8..].to_string())); } if string.to_lowercase().starts_with("data") { return Ok(Frame::DataStart); } if string.to_lowercase().starts_with("quit") { return Ok(Frame::Quit); } if string.to_lowercase().trim().starts_with('.') { return Ok(Frame::DataEnd); } Ok(Frame::Raw(string)) } } pub struct Connection { stream: BufWriter, buffer: BytesMut, mail_from: String, rcpt_to: String, data: String, state: ConnectionState, } impl Connection { pub fn new(socket: TcpStream) -> Connection { Connection { stream: BufWriter::new(socket), buffer: BytesMut::with_capacity(4 * 1024), state: ConnectionState::Commands, mail_from: String::default(), rcpt_to: String::default(), data: String::default(), } } pub async fn read_frame(&mut self) -> anyhow::Result> { loop { if let Some(frame) = self.parse_frame()? { return Ok(Some(frame)); } if 0 == self.stream.read_buf(&mut self.buffer).await? { if self.buffer.is_empty() { return Ok(None); } else { return Err(anyhow::anyhow!("Connection reset by peer")); } } } } async fn write_frame(&mut self, frame: &Frame) -> anyhow::Result<()> { match frame { Frame::Raw(val) => { self.stream.write_all(val.as_bytes()).await?; self.stream.write_all(b"\r\n").await?; } Frame::Ok(val) => { self.stream.write_all(b"250 ").await?; self.stream.write_all(val.as_bytes()).await?; self.stream.write_all(b"\r\n").await?; } Frame::Close => { self.stream.write_all(b"221 Closing connection").await?; self.stream.write_all(b"\r\n").await?; } Frame::Header => { self.stream.write_all(b"220 Mailspy Test Server").await?; self.stream.write_all(b"\r\n").await?; } Frame::StartMailInput => { self.stream.write_all(b"354 Start mail input").await?; self.stream.write_all(b"\r\n").await?; } _ => {} } self.stream.flush().await?; Ok(()) } fn parse_frame(&mut self) -> anyhow::Result> { let mut buf = Cursor::new(&self.buffer[..]); match Frame::check(&mut buf) { Ok(_) => { let len = buf.position() as usize; buf.set_position(0); let frame = Frame::parse(&mut buf)?; self.buffer.advance(len); Ok(Some(frame)) } Err(_) => Ok(None), } } } pub async fn server(mailbox: Mailbox, port: u16) -> anyhow::Result<()> { let addr = format!("127.0.0.1:{}", port); let listener = TcpListener::bind(&addr).await?; tracing::info!(port =? port, "SMTP Server running"); tokio::spawn(async move { loop { if let Ok((socket, _)) = listener.accept().await { let mb = mailbox.clone(); tokio::spawn(async move { if let Err(e) = process(socket, mb).await { tracing::error!("Mail processing error: {}", e); } }); } } }); Ok(()) } async fn process(socket: TcpStream, mailbox: Mailbox) -> anyhow::Result<()> { let mut connection = Connection::new(socket); connection.write_frame(&Frame::Header).await.unwrap(); loop { if let Some(frame) = connection.read_frame().await.unwrap() { tracing::info!("Frame read: {:?}", frame); match frame { Frame::Ehlo(_) => { connection .write_frame(&Frame::Ok("Good to go".to_string())) .await?; } Frame::From(val) => { connection.mail_from = val; connection .write_frame(&Frame::Ok("2.1.0 Sender OK".to_string())) .await?; } Frame::To(val) => { connection.rcpt_to = val; connection .write_frame(&Frame::Ok("2.1.5 Recipient OK".to_string())) .await?; } Frame::DataStart => { connection.write_frame(&Frame::StartMailInput).await?; connection.state = ConnectionState::Data; } Frame::DataEnd => { connection .write_frame(&Frame::Ok("Mail sent".to_string())) .await?; let mut mail = Mail::default(); let data = connection.data.clone(); let msg = mailparse::parse_mail(data.as_bytes())?; if msg.subparts.is_empty() { mail.body = vec![MailPart { content_type: msg .headers .get_first_value("Content-Type") .unwrap_or_else(|| "text/plain".to_string()), data: msg.get_body()?, }]; } else { mail.body = msg .subparts .iter() .map(|part| { part.get_body().map(|body| MailPart { content_type: part .headers .get_first_value("Content-Type") .unwrap_or_else(|| "text/plain".to_string()), data: body, }) }) .collect::, _>>()?; } mail.subject = msg.headers.get_first_value("Subject").unwrap_or_default(); mail.from = msg .headers .get_first_value("From") .unwrap_or_else(|| connection.mail_from.clone()); mail.to = msg .headers .get_first_value("To") .unwrap_or_else(|| connection.rcpt_to.clone()); mail.date = msg.headers.get_first_value("Date").unwrap_or_default(); mailbox.store(mail).await; connection.write_frame(&Frame::Close).await?; break Ok(()); } Frame::Raw(s) if connection.state == ConnectionState::Data => { writeln!(connection.data, "{}", s)?; } _ => { connection.write_frame(&Frame::Ok("OK".to_string())).await?; } } } } } /// Find a line fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> anyhow::Result<&'a [u8]> { if !src.has_remaining() { return Err(anyhow::anyhow!("Incomplete")); } // Scan the bytes directly let start = src.position() as usize; // Scan to the second to last byte let end = src.get_ref().len() - 1; for i in start..end { if src.get_ref()[i + 1] == b'\n' { // We found a line, update the position to be *after* the \n src.set_position((i + 2) as u64); // Return the line return Ok(&src.get_ref()[start..i + 1]); } } Err(anyhow::anyhow!("Incomplete")) }