use std::{collections::HashMap, path::Path}; use actix::SystemService; use actix_web::{ get, web::{self, Data}, App, HttpResponse, HttpServer, }; use rust_embed::RustEmbed; use sea_orm::{prelude::*, Database}; use tokio::sync::Mutex; use tracing::{info, instrument}; use tracing_subscriber::prelude::*; mod api; mod auth; mod entity; mod error; mod events; mod socket_session; #[cfg(all(target_env = "musl", target_pointer_width = "64"))] #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; #[derive(Debug)] pub struct AppState { pub db: DatabaseConnection, pub healthcheck_status: Mutex>, pub data_path: String, } #[actix_rt::main] async fn main() { let opts = clap::App::new("Vade Mecum") .arg( clap::Arg::new("port") .short('p') .env("VADE_PORT") .long("port") .value_name("number") .default_value("8089") .help("Set the port for the HTTP server") .takes_value(true), ) .arg( clap::Arg::new("data") .env("VADE_DB") .long("data") .value_name("path") .default_value("./") .help("Sets the path to the database location") .takes_value(true), ) .get_matches(); let subscriber = tracing_subscriber::registry().with( tracing_subscriber::fmt::Layer::new() .pretty() .with_writer(std::io::stdout) .with_ansi(true) .with_filter(tracing_subscriber::filter::LevelFilter::INFO), ); tracing::subscriber::set_global_default(subscriber).expect("Unable to set a global collector"); let db = setup_database(opts.value_of("data").unwrap_or_default()) .await .unwrap(); let state = web::Data::new(AppState { db, healthcheck_status: Mutex::new(HashMap::new()), data_path: opts.value_of("data").unwrap_or_default().to_string(), }); info!( "Starting http server on {}", opts.value_of("port").unwrap_or_default() ); let st = state.clone(); actix_rt::spawn(async move { loop { let apps: Vec = entity::application::Entity::find() .filter(entity::application::Column::EnableHealthcheck.eq(true)) .all(&st.db) .await .unwrap(); let client = reqwest::Client::builder().build().unwrap(); for app in apps { match client .request(reqwest::Method::GET, app.url) .timeout(std::time::Duration::from_secs(5)) .send() .await { Ok(res) if res.status() == 200 => { if !st .healthcheck_status .lock() .await .get(&app.id) .unwrap_or(&false) { st.healthcheck_status.lock().await.insert(app.id, true); let _ = events::EventBroker::from_registry() .send(events::Event::HealthcheckChange { app_id: app.id, alive: true, }) .await; } } Err(e) => { tracing::warn!("Error performing healthcheck: {}", e); st.healthcheck_status.lock().await.insert(app.id, false); let _ = events::EventBroker::from_registry() .send(events::Event::HealthcheckChange { app_id: app.id, alive: false, }) .await; } Ok(res) => { if *st .healthcheck_status .lock() .await .get(&app.id) .unwrap_or(&true) { tracing::warn!("Non 200 status code: {}", res.status()); st.healthcheck_status.lock().await.insert(app.id, false); let _ = events::EventBroker::from_registry() .send(events::Event::HealthcheckChange { app_id: app.id, alive: false, }) .await; } } } } tokio::time::sleep(tokio::time::Duration::from_secs(300)).await; } }); let listen_host = format!("0.0.0.0:{}", opts.value_of("port").unwrap_or_default()); HttpServer::new(move || { let cors = actix_cors::Cors::permissive(); App::new() .wrap(cors) .app_data(state.clone()) .wrap(tracing_actix_web::TracingLogger::default()) .service(get_bg) .service(web::resource("/events/{token}").to(socket_session::event_session_index)) .service(api::routes()) .service(dist) }) .bind(listen_host) .unwrap() .run() .await .expect("Couldnt launch server"); } #[derive(RustEmbed)] #[folder = "dist"] struct UIAssets; #[instrument] fn read_file(path: &Path) -> error::Result> { let f = std::fs::File::open(path)?; let mut reader = std::io::BufReader::new(f); let mut buffer = Vec::new(); std::io::Read::read_to_end(&mut reader, &mut buffer)?; tracing::info!("Read into buffer: {}", buffer.len()); Ok(buffer) } #[get("/bg.jpg")] async fn get_bg(state: Data) -> error::Result { let file_path = Path::new(&state.data_path).join("bg.jpg"); if file_path.exists() { let data = read_file(&file_path).unwrap(); Ok(HttpResponse::Ok().content_type("image/jpg").body(data)) } else { let content = UIAssets::get("bg.jpg").unwrap(); let body: actix_web::body::BoxBody = match content { std::borrow::Cow::Borrowed(bytes) => actix_web::body::BoxBody::new(bytes), std::borrow::Cow::Owned(bytes) => actix_web::body::BoxBody::new(bytes), }; Ok(HttpResponse::Ok().content_type("image/jpg").body(body)) } } #[get("/{filename:.*}")] async fn dist(path: web::Path) -> HttpResponse { let path = if UIAssets::get(&*path).is_some() { &*path } else { "index.html" }; let content = UIAssets::get(path).unwrap(); let body: actix_web::body::BoxBody = match content { std::borrow::Cow::Borrowed(bytes) => actix_web::body::BoxBody::new(bytes), std::borrow::Cow::Owned(bytes) => actix_web::body::BoxBody::new(bytes), }; HttpResponse::Ok() .content_type(mime_guess::from_path(path).first_or_octet_stream().as_ref()) .body(body) } #[instrument] async fn setup_database(db_path: &str) -> error::Result { let db_fname = "data.db"; let full_path = Path::new(db_path).join(db_fname); if !full_path.exists() { std::fs::File::create(full_path.clone())?; } let conn = format!("sqlite://{}", full_path.to_str().unwrap()); let pool = sqlx::SqlitePool::connect(&conn).await?; sqlx::migrate!("./migrations").run(&pool).await?; tracing::info!("Database migrated"); Ok(Database::connect(&conn).await?) }