Compare commits

..

No commits in common. "5eec85509a8392b242657ccc461b55a494f932a1" and "b223d6c7eefd246b3df03c8975ea988579ea9f08" have entirely different histories.

8 changed files with 67 additions and 303 deletions

View File

@ -1,11 +1,6 @@
kind: pipeline
name: default
trigger:
event:
- push
- promote
steps:
- name: test
image: rust:latest

99
Cargo.lock generated
View File

@ -131,12 +131,6 @@ dependencies = [
"alloc-stdlib",
]
[[package]]
name = "bytes"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "bytes"
version = "1.2.1"
@ -338,21 +332,6 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.24"
@ -360,7 +339,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@ -369,17 +347,6 @@ version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf"
[[package]]
name = "futures-executor"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.24"
@ -415,7 +382,6 @@ version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
@ -427,18 +393,6 @@ dependencies = [
"slab",
]
[[package]]
name = "futures_codec"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce54d63f8b0c75023ed920d46fd71d0cbbb830b0ee012726b5b4f506fb6dea5b"
dependencies = [
"bytes 0.5.6",
"futures",
"memchr",
"pin-project",
]
[[package]]
name = "generic-array"
version = "0.14.6"
@ -455,7 +409,7 @@ version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ca32592cf21ac7ccab1825cd87f6c9b3d9022c44d086172ed0966bec8af30be"
dependencies = [
"bytes 1.2.1",
"bytes",
"fnv",
"futures-core",
"futures-sink",
@ -482,7 +436,7 @@ checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584"
dependencies = [
"base64",
"bitflags",
"bytes 1.2.1",
"bytes",
"headers-core",
"http",
"httpdate",
@ -537,7 +491,7 @@ version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
dependencies = [
"bytes 1.2.1",
"bytes",
"fnv",
"itoa",
]
@ -548,7 +502,7 @@ version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [
"bytes 1.2.1",
"bytes",
"http",
"pin-project-lite",
]
@ -571,7 +525,7 @@ version = "0.14.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac"
dependencies = [
"bytes 1.2.1",
"bytes",
"futures-channel",
"futures-core",
"futures-util",
@ -694,8 +648,7 @@ name = "mailspy"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"bytes 1.2.1",
"bytes",
"clap",
"lettre",
"mailparse",
@ -893,26 +846,6 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "pin-project"
version = "0.4.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ef0f924a5ee7ea9cbcea77529dba45f8a9ba9f622419fe3386ca581a3ae9d5a"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "0.4.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "851c8d0ce9bebe43790dedfc86614c23494ac9f423dd618d3a61fc693eafe61e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.9"
@ -939,7 +872,7 @@ checksum = "a94ff00c513bee5c32ecbbf982f470e7e51e913330737dc40522ddc298954395"
dependencies = [
"async-compression",
"async-trait",
"bytes 1.2.1",
"bytes",
"futures-util",
"headers",
"hex",
@ -958,7 +891,6 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"smallvec",
"sse-codec",
"thiserror",
"tokio",
"tokio-stream",
@ -1277,18 +1209,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "sse-codec"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84a59f811350c44b4a037aabeb72dc6a9591fc22aa95a036db9a96297c58085a"
dependencies = [
"bytes 0.5.6",
"futures-io",
"futures_codec",
"memchr",
]
[[package]]
name = "strsim"
version = "0.10.0"
@ -1386,7 +1306,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0020c875007ad96677dcc890298f4b942882c5d4eb7cc8f439fc3bf813dc9c95"
dependencies = [
"autocfg",
"bytes 1.2.1",
"bytes",
"libc",
"memchr",
"mio",
@ -1428,9 +1348,8 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
dependencies = [
"bytes 1.2.1",
"bytes",
"futures-core",
"futures-io",
"futures-sink",
"pin-project-lite",
"tokio",

View File

@ -11,13 +11,12 @@ serde = { version = "1.0.145", features = ["derive"] }
serde_json = "1.0.85"
tokio = { version = "1.21.1", features = ["full"] }
bytes = "1.2.1"
poem = { version = "1.3.44", features = ["compression", "embed", "test"] }
poem = { version = "1.3.44", features = ["compression", "embed"] }
rust-embed = "6.4.1"
tracing = "0.1.36"
tracing-subscriber = "0.3.15"
mailparse = "0.13.8"
clap = { version = "3.2.22", features = ["derive"] }
async-trait = "0.1.57"
[dev-dependencies]
lettre = "0.10.1"

View File

@ -1,10 +1,6 @@
* Mailspy
Mailspy is a mock SMTP server for used during the development of applications that send email. Mailspy will serve a compliant, SMTP server that an application can connect to, while in development mode. It will accept emails using SMTP protocol, without encryption, and any basic authentication. A web interface is served to display them.
Mailspy does not have any persistance, received emails are stored in memory and last only during the life of the run. It is meant to be lightweight coming in at ~10mb on disk and using 3-4mb of memory. It is meant to be launched along side a development stack on dev machines.
Mailspy is a mock SMTP server for used during development of applications that send email. Mailspy will serve a compliant, mock SMTP server that an application can connect to, while in development mode. It will accept emails and display them in a web interface that it serves.
** Installation

View File

@ -40,55 +40,3 @@ pub async fn server(mailbox: Mailbox, port: u16) -> anyhow::Result<()> {
});
Ok(())
}
#[cfg(test)]
mod tests {
use super::{clear, messages};
use crate::mail::{Mail, Mailbox};
use poem::{
middleware::AddData,
test::{TestClient, TestJson},
EndpointExt, Route,
};
async fn request_data<'a>(url: &str, mb: &Mailbox) -> TestJson {
let app = Route::new()
.at("/messages", messages)
.at("/clear", clear)
.with(AddData::new(mb.clone()));
let client = TestClient::new(app);
let resp = client.get(url).send().await;
resp.assert_status_is_ok();
resp.json().await
}
async fn request_void<'a>(url: &str, mb: &Mailbox) {
let app = Route::new()
.at("/messages", messages)
.at("/clear", clear)
.with(AddData::new(mb.clone()));
let client = TestClient::new(app);
let resp = client.get(url).send().await;
resp.assert_status_is_ok();
}
#[tokio::test]
async fn test_messages_endpoint() {
let mb = Mailbox::new();
let resp = request_data("/messages", &mb).await;
let data = resp.value();
data.array().assert_is_empty();
mb.store(Mail::default()).await;
let resp = request_data("/messages", &mb).await;
let data = resp.value();
assert_eq!(data.array().len(), 1);
}
#[tokio::test]
async fn test_clear_endpoint() {
let mb = Mailbox::new();
mb.store(Mail::default()).await;
request_void("/clear", &mb).await;
assert_eq!(mb.len().await, 0);
}
}

View File

@ -40,10 +40,4 @@ impl Mailbox {
let inner = self.0.lock().await;
inner.clone()
}
#[cfg(test)]
pub async fn len(&self) -> usize {
let inner = self.0.lock().await;
inner.len()
}
}

View File

@ -1,6 +1,6 @@
use bytes::{Buf, BytesMut};
use mailparse::MailHeaderMap;
use std::fmt::{Display, Write as _};
use std::fmt::Write as _;
use std::io::Cursor;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufWriter},
@ -15,7 +15,7 @@ pub enum ConnectionState {
Data,
}
#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum Frame {
Header,
Raw(String),
@ -30,25 +30,11 @@ pub enum Frame {
Close,
}
impl Display for Frame {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Frame::Ok(v) => write!(f, "250 {}\r\n", v),
Frame::Raw(v) => write!(f, "{}\r\n", v),
Frame::Close => write!(f, "221 Closing connection\r\n"),
Frame::Header => write!(f, "220 Mailspy Test Server\r\n"),
Frame::StartMailInput => write!(f, "354 Start mail input\r\n"),
_ => Ok(()),
}
}
}
impl Frame {
pub fn check(buf: &mut Cursor<&[u8]>) -> anyhow::Result<()> {
get_line(buf)?;
Ok(())
}
pub fn parse(buf: &mut Cursor<&[u8]>) -> anyhow::Result<Frame> {
let line = get_line(buf)?.to_vec();
let string = String::from_utf8(line)?;
@ -74,20 +60,27 @@ impl Frame {
}
}
#[async_trait::async_trait]
pub trait Transmitter {
async fn read_frame(&mut self) -> anyhow::Result<Option<Frame>>;
async fn write_frame(&mut self, frame: &Frame) -> anyhow::Result<()>;
}
pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
mail_from: String,
rcpt_to: String,
data: String,
state: ConnectionState,
}
#[async_trait::async_trait]
impl Transmitter for Connection {
async fn read_frame(&mut self) -> anyhow::Result<Option<Frame>> {
impl Connection {
pub fn new(socket: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(socket),
buffer: BytesMut::with_capacity(4 * 1024),
state: ConnectionState::Commands,
mail_from: String::default(),
rcpt_to: String::default(),
data: String::default(),
}
}
pub async fn read_frame(&mut self) -> anyhow::Result<Option<Frame>> {
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
@ -104,18 +97,33 @@ impl Transmitter for Connection {
}
async fn write_frame(&mut self, frame: &Frame) -> anyhow::Result<()> {
self.stream.write_all(frame.to_string().as_bytes()).await?;
self.stream.flush().await?;
Ok(())
match frame {
Frame::Raw(val) => {
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Ok(val) => {
self.stream.write_all(b"250 ").await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Close => {
self.stream.write_all(b"221 Closing connection").await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Header => {
self.stream.write_all(b"220 Mailspy Test Server").await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::StartMailInput => {
self.stream.write_all(b"354 Start mail input").await?;
self.stream.write_all(b"\r\n").await?;
}
_ => {}
}
impl Connection {
pub fn new(socket: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(socket),
buffer: BytesMut::with_capacity(4 * 1024),
}
self.stream.flush().await?;
Ok(())
}
fn parse_frame(&mut self) -> anyhow::Result<Option<Frame>> {
@ -142,7 +150,7 @@ pub async fn server(mailbox: Mailbox, port: u16) -> anyhow::Result<()> {
if let Ok((socket, _)) = listener.accept().await {
let mb = mailbox.clone();
tokio::spawn(async move {
if let Err(e) = process(Connection::new(socket), mb).await {
if let Err(e) = process(socket, mb).await {
tracing::error!("Mail processing error: {}", e);
}
});
@ -152,42 +160,40 @@ pub async fn server(mailbox: Mailbox, port: u16) -> anyhow::Result<()> {
Ok(())
}
async fn process(mut connection: impl Transmitter, mailbox: Mailbox) -> anyhow::Result<()> {
async fn process(socket: TcpStream, mailbox: Mailbox) -> anyhow::Result<()> {
let mut connection = Connection::new(socket);
connection.write_frame(&Frame::Header).await.unwrap();
let mut mail_from = String::new();
let mut rcpt_to = String::new();
let mut state = ConnectionState::Commands;
let mut data = String::new();
loop {
if let Some(frame) = connection.read_frame().await.unwrap() {
tracing::info!("Frame read: {:?}", frame);
match frame {
Frame::Ehlo(_) => {
connection
.write_frame(&Frame::Ok("EHLO".to_string()))
.write_frame(&Frame::Ok("Good to go".to_string()))
.await?;
}
Frame::From(val) => {
mail_from = val;
connection.mail_from = val;
connection
.write_frame(&Frame::Ok("2.1.0 Sender OK".to_string()))
.await?;
}
Frame::To(val) => {
rcpt_to = val;
connection.rcpt_to = val;
connection
.write_frame(&Frame::Ok("2.1.5 Recipient OK".to_string()))
.await?;
}
Frame::DataStart => {
connection.write_frame(&Frame::StartMailInput).await?;
state = ConnectionState::Data;
connection.state = ConnectionState::Data;
}
Frame::DataEnd => {
connection
.write_frame(&Frame::Ok("Mail sent".to_string()))
.await?;
let mut mail = Mail::default();
let data = data.clone();
let data = connection.data.clone();
let msg = mailparse::parse_mail(data.as_bytes())?;
if msg.subparts.is_empty() {
mail.body = vec![MailPart {
@ -216,18 +222,18 @@ async fn process(mut connection: impl Transmitter, mailbox: Mailbox) -> anyhow::
mail.from = msg
.headers
.get_first_value("From")
.unwrap_or_else(|| mail_from.clone());
.unwrap_or_else(|| connection.mail_from.clone());
mail.to = msg
.headers
.get_first_value("To")
.unwrap_or_else(|| rcpt_to.clone());
.unwrap_or_else(|| connection.rcpt_to.clone());
mail.date = msg.headers.get_first_value("Date").unwrap_or_default();
mailbox.store(mail).await;
connection.write_frame(&Frame::Close).await?;
break Ok(());
}
Frame::Raw(s) if state == ConnectionState::Data => {
writeln!(data, "{}", s)?;
Frame::Raw(s) if connection.state == ConnectionState::Data => {
writeln!(connection.data, "{}", s)?;
}
_ => {
connection.write_frame(&Frame::Ok("OK".to_string())).await?;
@ -259,95 +265,3 @@ fn get_line<'a>(src: &mut Cursor<&'a [u8]>) -> anyhow::Result<&'a [u8]> {
Err(anyhow::anyhow!("Incomplete"))
}
#[cfg(test)]
mod tests {
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use super::*;
struct TestConnection {
pub in_rx: UnboundedReceiver<Frame>,
pub out_tx: UnboundedSender<Frame>,
}
#[async_trait::async_trait]
impl Transmitter for TestConnection {
async fn read_frame(&mut self) -> anyhow::Result<Option<Frame>> {
Ok(self.in_rx.recv().await)
}
async fn write_frame(&mut self, frame: &Frame) -> anyhow::Result<()> {
self.out_tx.send(frame.clone())?;
Ok(())
}
}
impl TestConnection {
pub fn setup() -> (UnboundedSender<Frame>, UnboundedReceiver<Frame>, Self) {
let (in_tx, in_rx) = tokio::sync::mpsc::unbounded_channel();
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
let s = Self { in_rx, out_tx };
(in_tx, out_rx, s)
}
}
#[tokio::test]
async fn test_process() {
let mb = Mailbox::new();
let (tx, mut rx, conn) = TestConnection::setup();
tokio::spawn(process(conn, mb.clone()));
assert!(matches!(rx.recv().await, Some(Frame::Header)));
tx.send(Frame::Ehlo("Test mail client".to_string()))
.unwrap();
assert!(matches!(rx.recv().await, Some(Frame::Ok(_))));
tx.send(Frame::From("alice@alison.com".to_string()))
.unwrap();
assert!(matches!(rx.recv().await, Some(Frame::Ok(_))));
tx.send(Frame::To("alice@alison.com".to_string())).unwrap();
assert!(matches!(rx.recv().await, Some(Frame::Ok(_))));
tx.send(Frame::To("alice@alison.com".to_string())).unwrap();
tx.send(Frame::DataStart).unwrap();
tx.send(Frame::Raw("body".to_string())).unwrap();
tx.send(Frame::DataEnd).unwrap();
assert!(matches!(rx.recv().await, Some(Frame::Ok(_))));
}
#[test]
fn test_get_line() {
let mut c = Cursor::new("First line\nSecond line\nThird line\n".as_bytes());
assert_eq!(get_line(&mut c).unwrap(), "First line".as_bytes());
assert_eq!(get_line(&mut c).unwrap(), "Second line".as_bytes());
assert_eq!(get_line(&mut c).unwrap(), "Third line".as_bytes());
}
#[test]
fn test_frame_parse() {
assert!(matches!(
Frame::parse(&mut Cursor::new("EHLO example.com\n".as_bytes())).unwrap(),
Frame::Ehlo(_),
));
assert!(matches!(
Frame::parse(&mut Cursor::new("RCPT TO: alice@example.com\n".as_bytes())).unwrap(),
Frame::To(_),
));
assert!(matches!(
Frame::parse(&mut Cursor::new(
"MAIL FROM: alice@example.com\n".as_bytes()
))
.unwrap(),
Frame::From(_),
));
assert!(matches!(
Frame::parse(&mut Cursor::new("DATA\n".as_bytes())).unwrap(),
Frame::DataStart,
));
assert!(matches!(
Frame::parse(&mut Cursor::new(".\n".as_bytes())).unwrap(),
Frame::DataEnd,
));
assert!(matches!(
Frame::parse(&mut Cursor::new("QUIT\n".as_bytes())).unwrap(),
Frame::Quit,
));
}
}

View File

@ -13,6 +13,5 @@
:global(html, body) {
padding: 0;
margin: 0;
overflow-x: hidden;
}
</style>