app/src/main.rs

230 lines
7.7 KiB
Rust

use std::{collections::HashMap, path::Path};
use actix::SystemService;
use actix_web::{
get,
web::{self, Data},
App, HttpResponse, HttpServer,
};
use rust_embed::RustEmbed;
use sea_orm::{prelude::*, Database};
use tokio::sync::Mutex;
use tracing::{info, instrument};
use tracing_subscriber::prelude::*;
mod api;
mod auth;
mod entity;
mod error;
mod events;
mod socket_session;
#[cfg(all(target_env = "musl", target_pointer_width = "64"))]
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
#[derive(Debug)]
pub struct AppState {
pub db: DatabaseConnection,
pub healthcheck_status: Mutex<HashMap<i32, bool>>,
pub data_path: String,
}
#[actix_rt::main]
async fn main() {
let opts = clap::App::new("Vade Mecum")
.arg(
clap::Arg::new("port")
.short('p')
.env("VADE_PORT")
.long("port")
.value_name("number")
.default_value("8089")
.help("Set the port for the HTTP server")
.takes_value(true),
)
.arg(
clap::Arg::new("data")
.env("VADE_DB")
.long("data")
.value_name("path")
.default_value("./")
.help("Sets the path to the database location")
.takes_value(true),
)
.get_matches();
let subscriber = tracing_subscriber::registry().with(
tracing_subscriber::fmt::Layer::new()
.pretty()
.with_writer(std::io::stdout)
.with_ansi(true)
.with_filter(tracing_subscriber::filter::LevelFilter::INFO),
);
tracing::subscriber::set_global_default(subscriber).expect("Unable to set a global collector");
let db = setup_database(opts.value_of("data").unwrap_or_default())
.await
.unwrap();
let state = web::Data::new(AppState {
db,
healthcheck_status: Mutex::new(HashMap::new()),
data_path: opts.value_of("data").unwrap_or_default().to_string(),
});
info!(
"Starting http server on {}",
opts.value_of("port").unwrap_or_default()
);
let st = state.clone();
actix_rt::spawn(async move {
loop {
let apps: Vec<entity::application::Model> = entity::application::Entity::find()
.filter(entity::application::Column::EnableHealthcheck.eq(true))
.all(&st.db)
.await
.unwrap();
let client = reqwest::Client::builder().build().unwrap();
for app in apps {
match client
.request(reqwest::Method::GET, app.url)
.timeout(std::time::Duration::from_secs(5))
.send()
.await
{
Ok(res) if res.status() == 200 => {
if !st
.healthcheck_status
.lock()
.await
.get(&app.id)
.unwrap_or(&false)
{
st.healthcheck_status.lock().await.insert(app.id, true);
let _ = events::EventBroker::from_registry()
.send(events::Event::HealthcheckChange {
app_id: app.id,
alive: true,
})
.await;
}
}
Err(e) => {
tracing::warn!("Error performing healthcheck: {}", e);
st.healthcheck_status.lock().await.insert(app.id, false);
let _ = events::EventBroker::from_registry()
.send(events::Event::HealthcheckChange {
app_id: app.id,
alive: false,
})
.await;
}
Ok(res) => {
if *st
.healthcheck_status
.lock()
.await
.get(&app.id)
.unwrap_or(&true)
{
tracing::warn!("Non 200 status code: {}", res.status());
st.healthcheck_status.lock().await.insert(app.id, false);
let _ = events::EventBroker::from_registry()
.send(events::Event::HealthcheckChange {
app_id: app.id,
alive: false,
})
.await;
}
}
}
}
tokio::time::sleep(tokio::time::Duration::from_secs(300)).await;
}
});
let listen_host = format!("0.0.0.0:{}", opts.value_of("port").unwrap_or_default());
HttpServer::new(move || {
let cors = actix_cors::Cors::permissive();
App::new()
.wrap(cors)
.app_data(state.clone())
.wrap(tracing_actix_web::TracingLogger::default())
.service(get_bg)
.service(web::resource("/events/{token}").to(socket_session::event_session_index))
.service(api::routes())
.service(dist)
})
.bind(listen_host)
.unwrap()
.run()
.await
.expect("Couldnt launch server");
}
#[derive(RustEmbed)]
#[folder = "dist"]
struct UIAssets;
#[instrument]
fn read_file(path: &Path) -> error::Result<Vec<u8>> {
let f = std::fs::File::open(path)?;
let mut reader = std::io::BufReader::new(f);
let mut buffer = Vec::new();
std::io::Read::read_to_end(&mut reader, &mut buffer)?;
tracing::info!("Read into buffer: {}", buffer.len());
Ok(buffer)
}
#[get("/bg.jpg")]
async fn get_bg(state: Data<AppState>) -> error::Result<HttpResponse> {
let file_path = Path::new(&state.data_path).join("bg.jpg");
if file_path.exists() {
let data = read_file(&file_path).unwrap();
Ok(HttpResponse::Ok().content_type("image/jpg").body(data))
} else {
let content = UIAssets::get("bg.jpg").unwrap();
let body: actix_web::body::BoxBody = match content {
std::borrow::Cow::Borrowed(bytes) => actix_web::body::BoxBody::new(bytes),
std::borrow::Cow::Owned(bytes) => actix_web::body::BoxBody::new(bytes),
};
Ok(HttpResponse::Ok().content_type("image/jpg").body(body))
}
}
#[get("/{filename:.*}")]
async fn dist(path: web::Path<String>) -> HttpResponse {
let path = if UIAssets::get(&*path).is_some() {
&*path
} else {
"index.html"
};
let content = UIAssets::get(path).unwrap();
let body: actix_web::body::BoxBody = match content {
std::borrow::Cow::Borrowed(bytes) => actix_web::body::BoxBody::new(bytes),
std::borrow::Cow::Owned(bytes) => actix_web::body::BoxBody::new(bytes),
};
HttpResponse::Ok()
.content_type(mime_guess::from_path(path).first_or_octet_stream().as_ref())
.body(body)
}
#[instrument]
async fn setup_database(db_path: &str) -> error::Result<DatabaseConnection> {
let db_fname = "data.db";
let full_path = Path::new(db_path).join(db_fname);
if !full_path.exists() {
std::fs::File::create(full_path.clone())?;
}
let conn = format!("sqlite://{}", full_path.to_str().unwrap());
let pool = sqlx::SqlitePool::connect(&conn).await?;
sqlx::migrate!("./migrations").run(&pool).await?;
tracing::info!("Database migrated");
Ok(Database::connect(&conn).await?)
}