From 59bce537d7a6d870c96b6f7534e4d1252f0929a2 Mon Sep 17 00:00:00 2001 From: Joe Bellus Date: Wed, 28 Sep 2022 17:47:01 -0400 Subject: [PATCH 1/4] Unit Tests Added unit tests for SMTP flow and HTTP endpoints Added test feature to poem Refactored SMTP Connection to use adapter traits to make testing easier. Socket communication is now abstracted away from the command flow processing. --- Cargo.lock | 99 +++++++++++++++++++++++--- Cargo.toml | 3 +- src/http.rs | 52 ++++++++++++++ src/mail.rs | 6 ++ src/smtp.rs | 198 +++++++++++++++++++++++++++++++++++++--------------- 5 files changed, 292 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e9ec9c0..1b90f14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -131,6 +131,12 @@ dependencies = [ "alloc-stdlib", ] +[[package]] +name = "bytes" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" + [[package]] name = "bytes" version = "1.2.1" @@ -332,6 +338,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.24" @@ -339,6 +360,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -347,6 +369,17 @@ version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf" +[[package]] +name = "futures-executor" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.24" @@ -382,6 +415,7 @@ version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -393,6 +427,18 @@ dependencies = [ "slab", ] +[[package]] +name = "futures_codec" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce54d63f8b0c75023ed920d46fd71d0cbbb830b0ee012726b5b4f506fb6dea5b" +dependencies = [ + "bytes 0.5.6", + "futures", + "memchr", + "pin-project", +] + [[package]] name = "generic-array" version = "0.14.6" @@ -409,7 +455,7 @@ version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ca32592cf21ac7ccab1825cd87f6c9b3d9022c44d086172ed0966bec8af30be" dependencies = [ - "bytes", + "bytes 1.2.1", "fnv", "futures-core", "futures-sink", @@ -436,7 +482,7 @@ checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584" dependencies = [ "base64", "bitflags", - "bytes", + "bytes 1.2.1", "headers-core", "http", "httpdate", @@ -491,7 +537,7 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" dependencies = [ - "bytes", + "bytes 1.2.1", "fnv", "itoa", ] @@ -502,7 +548,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ - "bytes", + "bytes 1.2.1", "http", "pin-project-lite", ] @@ -525,7 +571,7 @@ version = "0.14.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" dependencies = [ - "bytes", + "bytes 1.2.1", "futures-channel", "futures-core", "futures-util", @@ -648,7 +694,8 @@ name = "mailspy" version = "0.1.0" dependencies = [ "anyhow", - "bytes", + "async-trait", + "bytes 1.2.1", "clap", "lettre", "mailparse", @@ -846,6 +893,26 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +[[package]] +name = "pin-project" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ef0f924a5ee7ea9cbcea77529dba45f8a9ba9f622419fe3386ca581a3ae9d5a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "851c8d0ce9bebe43790dedfc86614c23494ac9f423dd618d3a61fc693eafe61e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.9" @@ -872,7 +939,7 @@ checksum = "a94ff00c513bee5c32ecbbf982f470e7e51e913330737dc40522ddc298954395" dependencies = [ "async-compression", "async-trait", - "bytes", + "bytes 1.2.1", "futures-util", "headers", "hex", @@ -891,6 +958,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "smallvec", + "sse-codec", "thiserror", "tokio", "tokio-stream", @@ -1209,6 +1277,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "sse-codec" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84a59f811350c44b4a037aabeb72dc6a9591fc22aa95a036db9a96297c58085a" +dependencies = [ + "bytes 0.5.6", + "futures-io", + "futures_codec", + "memchr", +] + [[package]] name = "strsim" version = "0.10.0" @@ -1306,7 +1386,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0020c875007ad96677dcc890298f4b942882c5d4eb7cc8f439fc3bf813dc9c95" dependencies = [ "autocfg", - "bytes", + "bytes 1.2.1", "libc", "memchr", "mio", @@ -1348,8 +1428,9 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" dependencies = [ - "bytes", + "bytes 1.2.1", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 090deb5..776c7ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,12 +11,13 @@ serde = { version = "1.0.145", features = ["derive"] } serde_json = "1.0.85" tokio = { version = "1.21.1", features = ["full"] } bytes = "1.2.1" -poem = { version = "1.3.44", features = ["compression", "embed"] } +poem = { version = "1.3.44", features = ["compression", "embed", "test"] } rust-embed = "6.4.1" tracing = "0.1.36" tracing-subscriber = "0.3.15" mailparse = "0.13.8" clap = { version = "3.2.22", features = ["derive"] } +async-trait = "0.1.57" [dev-dependencies] lettre = "0.10.1" diff --git a/src/http.rs b/src/http.rs index 8a5bbb7..b4926b8 100644 --- a/src/http.rs +++ b/src/http.rs @@ -40,3 +40,55 @@ pub async fn server(mailbox: Mailbox, port: u16) -> anyhow::Result<()> { }); Ok(()) } + +#[cfg(test)] +mod tests { + use super::{clear, messages}; + use crate::mail::{Mail, Mailbox}; + use poem::{ + middleware::AddData, + test::{TestClient, TestJson}, + EndpointExt, Route, + }; + + async fn request_data<'a>(url: &str, mb: &Mailbox) -> TestJson { + let app = Route::new() + .at("/messages", messages) + .at("/clear", clear) + .with(AddData::new(mb.clone())); + let client = TestClient::new(app); + let resp = client.get(url).send().await; + resp.assert_status_is_ok(); + resp.json().await + } + + async fn request_void<'a>(url: &str, mb: &Mailbox) { + let app = Route::new() + .at("/messages", messages) + .at("/clear", clear) + .with(AddData::new(mb.clone())); + let client = TestClient::new(app); + let resp = client.get(url).send().await; + resp.assert_status_is_ok(); + } + + #[tokio::test] + async fn test_messages_endpoint() { + let mb = Mailbox::new(); + let resp = request_data("/messages", &mb).await; + let data = resp.value(); + data.array().assert_is_empty(); + mb.store(Mail::default()).await; + let resp = request_data("/messages", &mb).await; + let data = resp.value(); + assert_eq!(data.array().len(), 1); + } + + #[tokio::test] + async fn test_clear_endpoint() { + let mb = Mailbox::new(); + mb.store(Mail::default()).await; + request_void("/clear", &mb).await; + assert_eq!(mb.len().await, 0); + } +} diff --git a/src/mail.rs b/src/mail.rs index c257448..853461c 100644 --- a/src/mail.rs +++ b/src/mail.rs @@ -40,4 +40,10 @@ impl Mailbox { let inner = self.0.lock().await; inner.clone() } + + #[cfg(test)] + pub async fn len(&self) -> usize { + let inner = self.0.lock().await; + inner.len() + } } diff --git a/src/smtp.rs b/src/smtp.rs index 85979f6..38033c8 100644 --- a/src/smtp.rs +++ b/src/smtp.rs @@ -1,6 +1,6 @@ use bytes::{Buf, BytesMut}; use mailparse::MailHeaderMap; -use std::fmt::Write as _; +use std::fmt::{Display, Write as _}; use std::io::Cursor; use tokio::{ io::{AsyncReadExt, AsyncWriteExt, BufWriter}, @@ -15,7 +15,7 @@ pub enum ConnectionState { Data, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Frame { Header, Raw(String), @@ -30,11 +30,25 @@ pub enum Frame { Close, } +impl Display for Frame { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Frame::Ok(v) => write!(f, "250 {}\r\n", v), + Frame::Raw(v) => write!(f, "{}\r\n", v), + Frame::Close => write!(f, "221 Closing connection\r\n"), + Frame::Header => write!(f, "220 Mailspy Test Server\r\n"), + Frame::StartMailInput => write!(f, "354 Start mail input\r\n"), + _ => Ok(()), + } + } +} + impl Frame { pub fn check(buf: &mut Cursor<&[u8]>) -> anyhow::Result<()> { get_line(buf)?; Ok(()) } + pub fn parse(buf: &mut Cursor<&[u8]>) -> anyhow::Result { let line = get_line(buf)?.to_vec(); let string = String::from_utf8(line)?; @@ -60,27 +74,20 @@ impl Frame { } } +#[async_trait::async_trait] +pub trait Transmitter { + async fn read_frame(&mut self) -> anyhow::Result>; + async fn write_frame(&mut self, frame: &Frame) -> anyhow::Result<()>; +} + pub struct Connection { stream: BufWriter, buffer: BytesMut, - mail_from: String, - rcpt_to: String, - data: String, - state: ConnectionState, } -impl Connection { - pub fn new(socket: TcpStream) -> Connection { - Connection { - stream: BufWriter::new(socket), - buffer: BytesMut::with_capacity(4 * 1024), - state: ConnectionState::Commands, - mail_from: String::default(), - rcpt_to: String::default(), - data: String::default(), - } - } - pub async fn read_frame(&mut self) -> anyhow::Result> { +#[async_trait::async_trait] +impl Transmitter for Connection { + async fn read_frame(&mut self) -> anyhow::Result> { loop { if let Some(frame) = self.parse_frame()? { return Ok(Some(frame)); @@ -97,34 +104,19 @@ impl Connection { } async fn write_frame(&mut self, frame: &Frame) -> anyhow::Result<()> { - match frame { - Frame::Raw(val) => { - self.stream.write_all(val.as_bytes()).await?; - self.stream.write_all(b"\r\n").await?; - } - Frame::Ok(val) => { - self.stream.write_all(b"250 ").await?; - self.stream.write_all(val.as_bytes()).await?; - self.stream.write_all(b"\r\n").await?; - } - Frame::Close => { - self.stream.write_all(b"221 Closing connection").await?; - self.stream.write_all(b"\r\n").await?; - } - Frame::Header => { - self.stream.write_all(b"220 Mailspy Test Server").await?; - self.stream.write_all(b"\r\n").await?; - } - Frame::StartMailInput => { - self.stream.write_all(b"354 Start mail input").await?; - self.stream.write_all(b"\r\n").await?; - } - _ => {} - } - + self.stream.write_all(frame.to_string().as_bytes()).await?; self.stream.flush().await?; Ok(()) } +} + +impl Connection { + pub fn new(socket: TcpStream) -> Connection { + Connection { + stream: BufWriter::new(socket), + buffer: BytesMut::with_capacity(4 * 1024), + } + } fn parse_frame(&mut self) -> anyhow::Result> { let mut buf = Cursor::new(&self.buffer[..]); @@ -150,7 +142,7 @@ pub async fn server(mailbox: Mailbox, port: u16) -> anyhow::Result<()> { if let Ok((socket, _)) = listener.accept().await { let mb = mailbox.clone(); tokio::spawn(async move { - if let Err(e) = process(socket, mb).await { + if let Err(e) = process(Connection::new(socket), mb).await { tracing::error!("Mail processing error: {}", e); } }); @@ -160,40 +152,42 @@ pub async fn server(mailbox: Mailbox, port: u16) -> anyhow::Result<()> { Ok(()) } -async fn process(socket: TcpStream, mailbox: Mailbox) -> anyhow::Result<()> { - let mut connection = Connection::new(socket); +async fn process(mut connection: impl Transmitter, mailbox: Mailbox) -> anyhow::Result<()> { connection.write_frame(&Frame::Header).await.unwrap(); + let mut mail_from = String::new(); + let mut rcpt_to = String::new(); + let mut state = ConnectionState::Commands; + let mut data = String::new(); loop { if let Some(frame) = connection.read_frame().await.unwrap() { - tracing::info!("Frame read: {:?}", frame); match frame { Frame::Ehlo(_) => { connection - .write_frame(&Frame::Ok("Good to go".to_string())) + .write_frame(&Frame::Ok("EHLO".to_string())) .await?; } Frame::From(val) => { - connection.mail_from = val; + mail_from = val; connection .write_frame(&Frame::Ok("2.1.0 Sender OK".to_string())) .await?; } Frame::To(val) => { - connection.rcpt_to = val; + rcpt_to = val; connection .write_frame(&Frame::Ok("2.1.5 Recipient OK".to_string())) .await?; } Frame::DataStart => { connection.write_frame(&Frame::StartMailInput).await?; - connection.state = ConnectionState::Data; + state = ConnectionState::Data; } Frame::DataEnd => { connection .write_frame(&Frame::Ok("Mail sent".to_string())) .await?; let mut mail = Mail::default(); - let data = connection.data.clone(); + let data = data.clone(); let msg = mailparse::parse_mail(data.as_bytes())?; if msg.subparts.is_empty() { mail.body = vec![MailPart { @@ -222,18 +216,18 @@ async fn process(socket: TcpStream, mailbox: Mailbox) -> anyhow::Result<()> { mail.from = msg .headers .get_first_value("From") - .unwrap_or_else(|| connection.mail_from.clone()); + .unwrap_or_else(|| mail_from.clone()); mail.to = msg .headers .get_first_value("To") - .unwrap_or_else(|| connection.rcpt_to.clone()); + .unwrap_or_else(|| rcpt_to.clone()); mail.date = msg.headers.get_first_value("Date").unwrap_or_default(); mailbox.store(mail).await; connection.write_frame(&Frame::Close).await?; break Ok(()); } - Frame::Raw(s) if connection.state == ConnectionState::Data => { - writeln!(connection.data, "{}", s)?; + Frame::Raw(s) if state == ConnectionState::Data => { + writeln!(data, "{}", s)?; } _ => { connection.write_frame(&Frame::Ok("OK".to_string())).await?; @@ -265,3 +259,95 @@ fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> anyhow::Result<&'a [u8]> { Err(anyhow::anyhow!("Incomplete")) } + +#[cfg(test)] +mod tests { + use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; + + use super::*; + + struct TestConnection { + pub in_rx: UnboundedReceiver, + pub out_tx: UnboundedSender, + } + + #[async_trait::async_trait] + impl Transmitter for TestConnection { + async fn read_frame(&mut self) -> anyhow::Result> { + Ok(self.in_rx.recv().await) + } + async fn write_frame(&mut self, frame: &Frame) -> anyhow::Result<()> { + self.out_tx.send(frame.clone())?; + Ok(()) + } + } + + impl TestConnection { + pub fn setup() -> (UnboundedSender, UnboundedReceiver, Self) { + let (in_tx, in_rx) = tokio::sync::mpsc::unbounded_channel(); + let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel(); + let s = Self { in_rx, out_tx }; + (in_tx, out_rx, s) + } + } + + #[tokio::test] + async fn test_process() { + let mb = Mailbox::new(); + let (tx, mut rx, conn) = TestConnection::setup(); + tokio::spawn(process(conn, mb.clone())); + assert!(matches!(rx.recv().await, Some(Frame::Header))); + tx.send(Frame::Ehlo("Test mail client".to_string())) + .unwrap(); + assert!(matches!(rx.recv().await, Some(Frame::Ok(_)))); + tx.send(Frame::From("alice@alison.com".to_string())) + .unwrap(); + assert!(matches!(rx.recv().await, Some(Frame::Ok(_)))); + tx.send(Frame::To("alice@alison.com".to_string())).unwrap(); + assert!(matches!(rx.recv().await, Some(Frame::Ok(_)))); + tx.send(Frame::To("alice@alison.com".to_string())).unwrap(); + tx.send(Frame::DataStart).unwrap(); + tx.send(Frame::Raw("body".to_string())).unwrap(); + tx.send(Frame::DataEnd).unwrap(); + assert!(matches!(rx.recv().await, Some(Frame::Ok(_)))); + } + + #[test] + fn test_get_line() { + let mut c = Cursor::new("First line\nSecond line\nThird line\n".as_bytes()); + assert_eq!(get_line(&mut c).unwrap(), "First line".as_bytes()); + assert_eq!(get_line(&mut c).unwrap(), "Second line".as_bytes()); + assert_eq!(get_line(&mut c).unwrap(), "Third line".as_bytes()); + } + + #[test] + fn test_frame_parse() { + assert!(matches!( + Frame::parse(&mut Cursor::new("EHLO example.com\n".as_bytes())).unwrap(), + Frame::Ehlo(_), + )); + assert!(matches!( + Frame::parse(&mut Cursor::new("RCPT TO: alice@example.com\n".as_bytes())).unwrap(), + Frame::To(_), + )); + assert!(matches!( + Frame::parse(&mut Cursor::new( + "MAIL FROM: alice@example.com\n".as_bytes() + )) + .unwrap(), + Frame::From(_), + )); + assert!(matches!( + Frame::parse(&mut Cursor::new("DATA\n".as_bytes())).unwrap(), + Frame::DataStart, + )); + assert!(matches!( + Frame::parse(&mut Cursor::new(".\n".as_bytes())).unwrap(), + Frame::DataEnd, + )); + assert!(matches!( + Frame::parse(&mut Cursor::new("QUIT\n".as_bytes())).unwrap(), + Frame::Quit, + )); + } +} From 77847b077a387ad961e9170ae5d2dd6b95f431a6 Mon Sep 17 00:00:00 2001 From: Joe Bellus Date: Wed, 28 Sep 2022 17:49:22 -0400 Subject: [PATCH 2/4] Fixed issue with horizontal scrollbars in the UI --- ui/src/routes/+layout.svelte | 1 + 1 file changed, 1 insertion(+) diff --git a/ui/src/routes/+layout.svelte b/ui/src/routes/+layout.svelte index 12e68f8..4f111e4 100644 --- a/ui/src/routes/+layout.svelte +++ b/ui/src/routes/+layout.svelte @@ -13,5 +13,6 @@ :global(html, body) { padding: 0; margin: 0; + overflow-x: hidden; } From e9a70b8ab81a1efdd9cec9e2438d0cd65a48d59f Mon Sep 17 00:00:00 2001 From: Joe Bellus Date: Wed, 28 Sep 2022 17:56:43 -0400 Subject: [PATCH 3/4] Added triggers to CI for push and promote This was done to prevent duplicate CI runs when pushing to PRs --- .drone.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.drone.yml b/.drone.yml index 5da00a6..e5c3775 100644 --- a/.drone.yml +++ b/.drone.yml @@ -1,6 +1,11 @@ kind: pipeline name: default +trigger: + event: + - push + - promote + steps: - name: test image: rust:latest From 43317da2543a2e9f3a4e46291219f8d4dc1e1b08 Mon Sep 17 00:00:00 2001 From: Joe Bellus Date: Wed, 28 Sep 2022 18:07:32 -0400 Subject: [PATCH 4/4] Updated README --- README.org | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/README.org b/README.org index 6150384..576b1e6 100644 --- a/README.org +++ b/README.org @@ -1,6 +1,10 @@ * Mailspy -Mailspy is a mock SMTP server for used during development of applications that send email. Mailspy will serve a compliant, mock SMTP server that an application can connect to, while in development mode. It will accept emails and display them in a web interface that it serves. +Mailspy is a mock SMTP server for used during the development of applications that send email. Mailspy will serve a compliant, SMTP server that an application can connect to, while in development mode. It will accept emails using SMTP protocol, without encryption, and any basic authentication. A web interface is served to display them. + +Mailspy does not have any persistance, received emails are stored in memory and last only during the life of the run. It is meant to be lightweight coming in at ~10mb on disk and using 3-4mb of memory. It is meant to be launched along side a development stack on dev machines. + + ** Installation