app/src/socket_session.rs

81 lines
2.3 KiB
Rust

use super::events::{Connect, Disconnect, Event, EventBroker, ID};
use crate::auth::AuthClaims;
use actix::*;
use actix_web::{
web::{self, Data},
Error, HttpRequest, HttpResponse,
};
use actix_web_actors::ws;
use jsonwebtoken::{decode, DecodingKey, Validation};
pub struct EventSession {
id: ID,
}
impl Actor for EventSession {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let addr = ctx.address();
EventBroker::from_registry()
.send(Connect {
addr: addr.recipient(),
})
.into_actor(self)
.then(|res, act, ctx| {
match res {
Ok(res) => act.id = res,
// something is wrong. Burn it with fire.
_ => ctx.stop(),
}
fut::ready(())
})
.wait(ctx);
}
fn stopping(&mut self, _: &mut Self::Context) -> Running {
// notify suprivisor we are leavin
EventBroker::from_registry().do_send(Disconnect { id: self.id });
Running::Stop
}
}
impl Handler<Event> for EventSession {
type Result = ();
fn handle(&mut self, msg: Event, ctx: &mut Self::Context) -> Self::Result {
let data = serde_json::to_string(&msg).unwrap_or_default();
ctx.text(data);
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for EventSession {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => ctx.text(text),
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
_ => (),
}
}
}
#[tracing::instrument(skip(req, stream, state))]
pub async fn event_session_index(
req: HttpRequest,
stream: web::Payload,
state: Data<crate::AppState>,
path: web::Path<String>,
) -> Result<HttpResponse, Error> {
let secret = crate::auth::get_secret(&state.db).await?;
let decoded = decode::<AuthClaims>(
&path,
&DecodingKey::from_secret(&secret),
&Validation::default(),
);
match decoded {
Ok(_) => ws::start(EventSession { id: 0 }, &req, stream),
Err(e) => Err(actix_web::error::ErrorUnauthorized(e.to_string())),
}
}