From 59bce537d7a6d870c96b6f7534e4d1252f0929a2 Mon Sep 17 00:00:00 2001 From: Joe Bellus Date: Wed, 28 Sep 2022 17:47:01 -0400 Subject: [PATCH] Unit Tests Added unit tests for SMTP flow and HTTP endpoints Added test feature to poem Refactored SMTP Connection to use adapter traits to make testing easier. Socket communication is now abstracted away from the command flow processing. --- Cargo.lock | 99 +++++++++++++++++++++++--- Cargo.toml | 3 +- src/http.rs | 52 ++++++++++++++ src/mail.rs | 6 ++ src/smtp.rs | 198 +++++++++++++++++++++++++++++++++++++--------------- 5 files changed, 292 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e9ec9c0..1b90f14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -131,6 +131,12 @@ dependencies = [ "alloc-stdlib", ] +[[package]] +name = "bytes" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" + [[package]] name = "bytes" version = "1.2.1" @@ -332,6 +338,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.24" @@ -339,6 +360,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -347,6 +369,17 @@ version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf" +[[package]] +name = "futures-executor" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.24" @@ -382,6 +415,7 @@ version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -393,6 +427,18 @@ dependencies = [ "slab", ] +[[package]] +name = "futures_codec" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce54d63f8b0c75023ed920d46fd71d0cbbb830b0ee012726b5b4f506fb6dea5b" +dependencies = [ + "bytes 0.5.6", + "futures", + "memchr", + "pin-project", +] + [[package]] name = "generic-array" version = "0.14.6" @@ -409,7 +455,7 @@ version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca32592cf21ac7ccab1825cd87f6c9b3d9022c44d086172ed0966bec8af30be" dependencies = [ - "bytes", + "bytes 1.2.1", "fnv", "futures-core", "futures-sink", @@ -436,7 +482,7 @@ checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584" dependencies = [ "base64", "bitflags", - "bytes", + "bytes 1.2.1", "headers-core", "http", "httpdate", @@ -491,7 +537,7 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" dependencies = [ - "bytes", + "bytes 1.2.1", "fnv", "itoa", ] @@ -502,7 +548,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ - "bytes", + "bytes 1.2.1", "http", "pin-project-lite", ] @@ -525,7 +571,7 @@ version = "0.14.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" dependencies = [ - "bytes", + "bytes 1.2.1", "futures-channel", "futures-core", "futures-util", @@ -648,7 +694,8 @@ name = "mailspy" version = "0.1.0" dependencies = [ "anyhow", - "bytes", + "async-trait", + "bytes 1.2.1", "clap", "lettre", "mailparse", @@ -846,6 +893,26 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +[[package]] +name = "pin-project" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ef0f924a5ee7ea9cbcea77529dba45f8a9ba9f622419fe3386ca581a3ae9d5a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "851c8d0ce9bebe43790dedfc86614c23494ac9f423dd618d3a61fc693eafe61e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.9" @@ -872,7 +939,7 @@ checksum = "a94ff00c513bee5c32ecbbf982f470e7e51e913330737dc40522ddc298954395" dependencies = [ "async-compression", "async-trait", - "bytes", + "bytes 1.2.1", "futures-util", "headers", "hex", @@ -891,6 +958,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "smallvec", + "sse-codec", "thiserror", "tokio", "tokio-stream", @@ -1209,6 +1277,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "sse-codec" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84a59f811350c44b4a037aabeb72dc6a9591fc22aa95a036db9a96297c58085a" +dependencies = [ + "bytes 0.5.6", + "futures-io", + "futures_codec", + "memchr", +] + [[package]] name = "strsim" version = "0.10.0" @@ -1306,7 +1386,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0020c875007ad96677dcc890298f4b942882c5d4eb7cc8f439fc3bf813dc9c95" dependencies = [ "autocfg", - "bytes", + "bytes 1.2.1", "libc", "memchr", "mio", @@ -1348,8 +1428,9 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" dependencies = [ - "bytes", + "bytes 1.2.1", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 090deb5..776c7ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,12 +11,13 @@ serde = { version = "1.0.145", features = ["derive"] } serde_json = "1.0.85" tokio = { version = "1.21.1", features = ["full"] } bytes = "1.2.1" -poem = { version = "1.3.44", features = ["compression", "embed"] } +poem = { version = "1.3.44", features = ["compression", "embed", "test"] } rust-embed = "6.4.1" tracing = "0.1.36" tracing-subscriber = "0.3.15" mailparse = "0.13.8" clap = { version = "3.2.22", features = ["derive"] } +async-trait = "0.1.57" [dev-dependencies] lettre = "0.10.1" diff --git a/src/http.rs b/src/http.rs index 8a5bbb7..b4926b8 100644 --- a/src/http.rs +++ b/src/http.rs @@ -40,3 +40,55 @@ pub async fn server(mailbox: Mailbox, port: u16) -> anyhow::Result<()> { }); Ok(()) } + +#[cfg(test)] +mod tests { + use super::{clear, messages}; + use crate::mail::{Mail, Mailbox}; + use poem::{ + middleware::AddData, + test::{TestClient, TestJson}, + EndpointExt, Route, + }; + + async fn request_data<'a>(url: &str, mb: &Mailbox) -> TestJson { + let app = Route::new() + .at("/messages", messages) + .at("/clear", clear) + .with(AddData::new(mb.clone())); + let client = TestClient::new(app); + let resp = client.get(url).send().await; + resp.assert_status_is_ok(); + resp.json().await + } + + async fn request_void<'a>(url: &str, mb: &Mailbox) { + let app = Route::new() + .at("/messages", messages) + .at("/clear", clear) + .with(AddData::new(mb.clone())); + let client = TestClient::new(app); + let resp = client.get(url).send().await; + resp.assert_status_is_ok(); + } + + #[tokio::test] + async fn test_messages_endpoint() { + let mb = Mailbox::new(); + let resp = request_data("/messages", &mb).await; + let data = resp.value(); + data.array().assert_is_empty(); + mb.store(Mail::default()).await; + let resp = request_data("/messages", &mb).await; + let data = resp.value(); + assert_eq!(data.array().len(), 1); + } + + #[tokio::test] + async fn test_clear_endpoint() { + let mb = Mailbox::new(); + mb.store(Mail::default()).await; + request_void("/clear", &mb).await; + assert_eq!(mb.len().await, 0); + } +} diff --git a/src/mail.rs b/src/mail.rs index c257448..853461c 100644 --- a/src/mail.rs +++ b/src/mail.rs @@ -40,4 +40,10 @@ impl Mailbox { let inner = self.0.lock().await; inner.clone() } + + #[cfg(test)] + pub async fn len(&self) -> usize { + let inner = self.0.lock().await; + inner.len() + } } diff --git a/src/smtp.rs b/src/smtp.rs index 85979f6..38033c8 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -1,6 +1,6 @@ use bytes::{Buf, BytesMut}; use mailparse::MailHeaderMap; -use std::fmt::Write as _; +use std::fmt::{Display, Write as _}; use std::io::Cursor; use tokio::{ io::{AsyncReadExt, AsyncWriteExt, BufWriter}, @@ -15,7 +15,7 @@ pub enum ConnectionState { Data, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Frame { Header, Raw(String), @@ -30,11 +30,25 @@ pub enum Frame { Close, } +impl Display for Frame { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Frame::Ok(v) => write!(f, "250 {}\r\n", v), + Frame::Raw(v) => write!(f, "{}\r\n", v), + Frame::Close => write!(f, "221 Closing connection\r\n"), + Frame::Header => write!(f, "220 Mailspy Test Server\r\n"), + Frame::StartMailInput => write!(f, "354 Start mail input\r\n"), + _ => Ok(()), + } + } +} + impl Frame { pub fn check(buf: &mut Cursor<&[u8]>) -> anyhow::Result<()> { get_line(buf)?; Ok(()) } + pub fn parse(buf: &mut Cursor<&[u8]>) -> anyhow::Result { let line = get_line(buf)?.to_vec(); let string = String::from_utf8(line)?; @@ -60,27 +74,20 @@ impl Frame { } } +#[async_trait::async_trait] +pub trait Transmitter { + async fn read_frame(&mut self) -> anyhow::Result>; + async fn write_frame(&mut self, frame: &Frame) -> anyhow::Result<()>; +} + pub struct Connection { stream: BufWriter, buffer: BytesMut, - mail_from: String, - rcpt_to: String, - data: String, - state: ConnectionState, } -impl Connection { - pub fn new(socket: TcpStream) -> Connection { - Connection { - stream: BufWriter::new(socket), - buffer: BytesMut::with_capacity(4 * 1024), - state: ConnectionState::Commands, - mail_from: String::default(), - rcpt_to: String::default(), - data: String::default(), - } - } - pub async fn read_frame(&mut self) -> anyhow::Result> { +#[async_trait::async_trait] +impl Transmitter for Connection { + async fn read_frame(&mut self) -> anyhow::Result> { loop { if let Some(frame) = self.parse_frame()? { return Ok(Some(frame)); @@ -97,34 +104,19 @@ impl Connection { } async fn write_frame(&mut self, frame: &Frame) -> anyhow::Result<()> { - match frame { - Frame::Raw(val) => { - self.stream.write_all(val.as_bytes()).await?; - self.stream.write_all(b"\r\n").await?; - } - Frame::Ok(val) => { - self.stream.write_all(b"250 ").await?; - self.stream.write_all(val.as_bytes()).await?; - self.stream.write_all(b"\r\n").await?; - } - Frame::Close => { - self.stream.write_all(b"221 Closing connection").await?; - self.stream.write_all(b"\r\n").await?; - } - Frame::Header => { - self.stream.write_all(b"220 Mailspy Test Server").await?; - self.stream.write_all(b"\r\n").await?; - } - Frame::StartMailInput => { - self.stream.write_all(b"354 Start mail input").await?; - self.stream.write_all(b"\r\n").await?; - } - _ => {} - } - + self.stream.write_all(frame.to_string().as_bytes()).await?; self.stream.flush().await?; Ok(()) } +} + +impl Connection { + pub fn new(socket: TcpStream) -> Connection { + Connection { + stream: BufWriter::new(socket), + buffer: BytesMut::with_capacity(4 * 1024), + } + } fn parse_frame(&mut self) -> anyhow::Result> { let mut buf = Cursor::new(&self.buffer[..]); @@ -150,7 +142,7 @@ pub async fn server(mailbox: Mailbox, port: u16) -> anyhow::Result<()> { if let Ok((socket, _)) = listener.accept().await { let mb = mailbox.clone(); tokio::spawn(async move { - if let Err(e) = process(socket, mb).await { + if let Err(e) = process(Connection::new(socket), mb).await { tracing::error!("Mail processing error: {}", e); } }); @@ -160,40 +152,42 @@ pub async fn server(mailbox: Mailbox, port: u16) -> anyhow::Result<()> { Ok(()) } -async fn process(socket: TcpStream, mailbox: Mailbox) -> anyhow::Result<()> { - let mut connection = Connection::new(socket); +async fn process(mut connection: impl Transmitter, mailbox: Mailbox) -> anyhow::Result<()> { connection.write_frame(&Frame::Header).await.unwrap(); + let mut mail_from = String::new(); + let mut rcpt_to = String::new(); + let mut state = ConnectionState::Commands; + let mut data = String::new(); loop { if let Some(frame) = connection.read_frame().await.unwrap() { - tracing::info!("Frame read: {:?}", frame); match frame { Frame::Ehlo(_) => { connection - .write_frame(&Frame::Ok("Good to go".to_string())) + .write_frame(&Frame::Ok("EHLO".to_string())) .await?; } Frame::From(val) => { - connection.mail_from = val; + mail_from = val; connection .write_frame(&Frame::Ok("2.1.0 Sender OK".to_string())) .await?; } Frame::To(val) => { - connection.rcpt_to = val; + rcpt_to = val; connection .write_frame(&Frame::Ok("2.1.5 Recipient OK".to_string())) .await?; } Frame::DataStart => { connection.write_frame(&Frame::StartMailInput).await?; - connection.state = ConnectionState::Data; + state = ConnectionState::Data; } Frame::DataEnd => { connection .write_frame(&Frame::Ok("Mail sent".to_string())) .await?; let mut mail = Mail::default(); - let data = connection.data.clone(); + let data = data.clone(); let msg = mailparse::parse_mail(data.as_bytes())?; if msg.subparts.is_empty() { mail.body = vec![MailPart { @@ -222,18 +216,18 @@ async fn process(socket: TcpStream, mailbox: Mailbox) -> anyhow::Result<()> { mail.from = msg .headers .get_first_value("From") - .unwrap_or_else(|| connection.mail_from.clone()); + .unwrap_or_else(|| mail_from.clone()); mail.to = msg .headers .get_first_value("To") - .unwrap_or_else(|| connection.rcpt_to.clone()); + .unwrap_or_else(|| rcpt_to.clone()); mail.date = msg.headers.get_first_value("Date").unwrap_or_default(); mailbox.store(mail).await; connection.write_frame(&Frame::Close).await?; break Ok(()); } - Frame::Raw(s) if connection.state == ConnectionState::Data => { - writeln!(connection.data, "{}", s)?; + Frame::Raw(s) if state == ConnectionState::Data => { + writeln!(data, "{}", s)?; } _ => { connection.write_frame(&Frame::Ok("OK".to_string())).await?; @@ -265,3 +259,95 @@ fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> anyhow::Result<&'a [u8]> { Err(anyhow::anyhow!("Incomplete")) } + +#[cfg(test)] +mod tests { + use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; + + use super::*; + + struct TestConnection { + pub in_rx: UnboundedReceiver, + pub out_tx: UnboundedSender, + } + + #[async_trait::async_trait] + impl Transmitter for TestConnection { + async fn read_frame(&mut self) -> anyhow::Result> { + Ok(self.in_rx.recv().await) + } + async fn write_frame(&mut self, frame: &Frame) -> anyhow::Result<()> { + self.out_tx.send(frame.clone())?; + Ok(()) + } + } + + impl TestConnection { + pub fn setup() -> (UnboundedSender, UnboundedReceiver, Self) { + let (in_tx, in_rx) = tokio::sync::mpsc::unbounded_channel(); + let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel(); + let s = Self { in_rx, out_tx }; + (in_tx, out_rx, s) + } + } + + #[tokio::test] + async fn test_process() { + let mb = Mailbox::new(); + let (tx, mut rx, conn) = TestConnection::setup(); + tokio::spawn(process(conn, mb.clone())); + assert!(matches!(rx.recv().await, Some(Frame::Header))); + tx.send(Frame::Ehlo("Test mail client".to_string())) + .unwrap(); + assert!(matches!(rx.recv().await, Some(Frame::Ok(_)))); + tx.send(Frame::From("alice@alison.com".to_string())) + .unwrap(); + assert!(matches!(rx.recv().await, Some(Frame::Ok(_)))); + tx.send(Frame::To("alice@alison.com".to_string())).unwrap(); + assert!(matches!(rx.recv().await, Some(Frame::Ok(_)))); + tx.send(Frame::To("alice@alison.com".to_string())).unwrap(); + tx.send(Frame::DataStart).unwrap(); + tx.send(Frame::Raw("body".to_string())).unwrap(); + tx.send(Frame::DataEnd).unwrap(); + assert!(matches!(rx.recv().await, Some(Frame::Ok(_)))); + } + + #[test] + fn test_get_line() { + let mut c = Cursor::new("First line\nSecond line\nThird line\n".as_bytes()); + assert_eq!(get_line(&mut c).unwrap(), "First line".as_bytes()); + assert_eq!(get_line(&mut c).unwrap(), "Second line".as_bytes()); + assert_eq!(get_line(&mut c).unwrap(), "Third line".as_bytes()); + } + + #[test] + fn test_frame_parse() { + assert!(matches!( + Frame::parse(&mut Cursor::new("EHLO example.com\n".as_bytes())).unwrap(), + Frame::Ehlo(_), + )); + assert!(matches!( + Frame::parse(&mut Cursor::new("RCPT TO: alice@example.com\n".as_bytes())).unwrap(), + Frame::To(_), + )); + assert!(matches!( + Frame::parse(&mut Cursor::new( + "MAIL FROM: alice@example.com\n".as_bytes() + )) + .unwrap(), + Frame::From(_), + )); + assert!(matches!( + Frame::parse(&mut Cursor::new("DATA\n".as_bytes())).unwrap(), + Frame::DataStart, + )); + assert!(matches!( + Frame::parse(&mut Cursor::new(".\n".as_bytes())).unwrap(), + Frame::DataEnd, + )); + assert!(matches!( + Frame::parse(&mut Cursor::new("QUIT\n".as_bytes())).unwrap(), + Frame::Quit, + )); + } +}